cqm.py 191 KB
Newer Older
1
#!/usr/bin/env python
2
# $Id$
3
4

'''Cobalt Queue Manager'''
5
__revision__ = '$Revision$'
6

7
8
9
#
# TODO:
#
10
# - modify progress routine to catch exceptions and report them using _sm_log_exception.  should some or all exceptions cause
11
12
#   the job to be terminated?
#
13
# - make a pass through the _sm_log calls and decide the correct error levels
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#
# Changes:
#
# - (Job) the system_state and user_state fields no longer exist.
#
# - (Job) the admin_hold and user_hold fields are now set to True or False to respective hold or release a job.
#
# - (Job) setting the preemptable field to true specifies that a task associate with the job may be preempted.
#
# - (Job) the is_runnable field is true when the job is able to run (ready or preempted states) and false otherwise.
#
# - (Job) the is_active field is set to true when the job is in any state other than ready (queued), held, preempted or completed
#   (terminal).
#
# - (Job) the has_resources field is set to true if resources are assigned to the job.  this assumes that resources may not be
#   deallocated until after the resource epilogue scripts have been run and thus remains true until those scripts have completed.
#   in reality, the system component may reacquire control of the resources and possibly reassign them as soon as the task
#   terminates.  this may occur even before cqm is notices that the task has ended.  unfortunately, this is the best we can do
#   without acquiring resource state from the system component.
#
# - (Job) the has_completed field is set to true only once the job has finished (reached the terminal state).  a value of true
#   does not imply that the job completed successfully, only that it is not active and may never be run again.
#
# - (Job) the preempts field contains a count of the number of times a task associated with the job has been
#   preempted.  this field is read-only and only exists if the job is preemptable.
#
# - (Job) the maxcptime field specifies the time needed for a task to checkpoint.  if maxcptime is greater than zero, the task
#   will first be signaled to checkpoint (SIGUSR1).  after the specified amount of time elapses, the task will then be signaled
#   to terminate (SIGTERM).  if maxcptime is zero, the task will immediately be signaled to terminate.  (BRT: is this the right
#   thing to do?  how will the job know the difference between being preempted and killed?  is it necessary for it to know the
#   difference no checkpoint time is requested?)
#
# - (Job) the mintasktime field specifies the minimum amount of time a task must run before it may be preempted.  any
#   preemption requests made prior to the specified amount of time elapsing will be recorded and processed once the specified
#   amount of time has elapsed.  notification to checkpoint will be made prior to preemption as indicated by the maxcptime.
#
# - (Job) the maxtasktime field specifies the maximum amount of time a task will be allowed to run before being preempted.
#   after the specified amount of time has elasped the task will be signaled to terminate (SIGTERM).  notification to checkpoint
#   will be made prior to preemption as indicated by maxcptime.
#
# - (Job) setting the walltime field is no longer required (at least by the cqm component).  instead, the maxtasktime field
#   may be used to specify repeated timeslices to be given to the job until it terminates.  (BRT: this is a first stab at
#   timeslicing.  eventually, we may want the scheduler component to have some say in whether a job gets preempted after the
#   maximum amount of time has been reached, allowing a job to continue to run past it's specified timeslice if no resource
#   contention exists.)
#
# - (Job) the 'force_kill_delay' field specifies the time that must elapse before a task being preempted or killed should be
#   forcibly terminated (with SIGKILL).  if the field is not set in the job instance, the value of 'force_kill_delay' in the cqm
#   section of the config file will be used.  if 'force_kill_delay' is not found in the cqm section of the config file, then the
#   module attribute DEFAULT_FORCE_KILL_DELAY is used (see below).
#
# Assumptions:
#
# - (Job/CQM) the system component will return a non-zero exit status if the task was killed by a signal.  'None' is considered
#   to be non-zero and thus would be a valid exit status if the task was terminated.
#

DEFAULT_FORCE_KILL_DELAY = 5  # (in minutes)

import errno
74
75
import logging
import os
76
import pwd
77
import grp
78
79
import sys
import time
80
81
import math
import types
82
83
import xmlrpclib
import ConfigParser
84
85
86
87
import signal
import thread
from threading import Thread, Lock
import traceback
88
import string
89

90
91
92

import Cobalt
import Cobalt.Util
93
from Cobalt.Util import Timer, pickle_data, unpickle_data, disk_writer_thread
94
import Cobalt.Cqparse
95
from Cobalt.Data import Data, DataList, DataDict, IncrID
96
from Cobalt.StateMachine import StateMachine
97
from Cobalt.Components.base import Component, exposed, automatic, query, locking
Daniel Buettner's avatar
Daniel Buettner committed
98
from Cobalt.Proxy import ComponentProxy
99
100
from Cobalt.Exceptions import (QueueError, ComponentLookupError, DataStateError, DataStateTransitionError, StateMachineError,
    StateMachineIllegalEventError, StateMachineNonexistentEventError, ThreadPickledAliveException, JobProcessingError,
101
    JobRunError, JobPreemptionError, JobDeleteError, IncrIDError, ResourceReservationFailure)
102
from Cobalt import accounting
Daniel Buettner's avatar
Daniel Buettner committed
103
from Cobalt.Statistics import Statistics
104
105
106
from Cobalt.Util import get_config_option, init_cobalt_config

init_cobalt_config()
107

108
109
CLOB_SIZE = 4096

110
logger = logging.getLogger(__name__.split('.')[-1])
111

Daniel Buettner's avatar
Daniel Buettner committed
112
cqm_id_gen = None
113
run_id_gen = None #IncrID()
114

115
116
117
118
119
cqm_forker_tag = "cqm_script"
job_prescript_tag = "job prescript"
job_postscript_tag = "job postscript"
resource_postscript_tag = "resource postscript"

120
121
122
123
124
125
126
127
128
129
130
131
132
config = ConfigParser.ConfigParser()
config.read(Cobalt.CONFIG_FILES)
if not config.has_section('cqm'):
    print '''"cqm" section missing from cobalt config file'''
    sys.exit(1)

def get_cqm_config(option, default):
    try:
        value = config.get('cqm', option)
    except ConfigParser.NoOptionError:
        value = default
    return value

133
134
135
136
137
138
139
def get_bgsched_config(option, default):
    try:
        value = config.get('bgsched', option)
    except ConfigParser.NoOptionError:
        value = default
    return value

140
141
142
143
144
145
146
147
# *AdjEst*
def get_histm_config(option, default):
    try:
        value = config.get('histm', option)
    except ConfigParser.NoSectionError:
        value = default
    return value

148
CQM_SCALE_DEP_FRAC = str(get_cqm_config('scale_dep_frac', 'false')).lower() in Cobalt.Util.config_true_values
149

150
151
152
153
154
155
156
walltime_prediction = get_histm_config("walltime_prediction", "False").lower()   # *AdjEst*
walltime_prediction_configured = False
walltime_prediction_enabled = False
if walltime_prediction  == "true":
    walltime_prediction_configured = True
    walltime_prediction_enabled = True
#print "walltime_prediction_configured=", walltime_prediction_configured 
157

158
159
prediction_scheme = get_histm_config("prediction_scheme", "combined").lower()  # ["project", "user", "combined"]   # *AdjEst*

160
161
162
163
164
accounting_logdir = os.path.expandvars(get_cqm_config("log_dir", Cobalt.DEFAULT_LOG_DIRECTORY))
accounting_logger = logging.getLogger("cqm.accounting")
accounting_logger.addHandler(
    accounting.DatetimeFileHandler(os.path.join(accounting_logdir, "%Y%m%d")))

165

166
dbwriter = Cobalt.Logging.dbwriter(logging)
167
use_db_logging = get_cqm_config('use_db_logging','false')
Cheetah Goletz's avatar
Cheetah Goletz committed
168
if use_db_logging.lower() in Cobalt.Util.config_true_values:
169
    dbwriter.enabled = True
170
171
172
173
174
175
176
177
178
179
    overflow_filename = get_cqm_config('overflow_file', None)
    max_queued = int(get_cqm_config('max_queued_msgs', '-1'))
    if max_queued <= 0:
        max_queued = None
    if (overflow_filename == None) and (max_queued != None):
        logger.warning('No filename set for database logging messages, max_queued_msgs set to unlimited')
    if max_queued != None:
        dbwriter.overflow_filename = overflow_filename
        dbwriter.max_queued = max_queued

180
181
182
183
184
185
186
#writer so that cobalt log writes don't hang up scheduling.
cobalt_log_writer = disk_writer_thread()
cobalt_log_writer.daemon = True
cobalt_log_writer.start()

def cobalt_log_write(filename, msg):
    '''send the cobalt_log writer thread a filename, msg tuple.
187

188
    '''
189
    cobalt_log_writer.send((filename, msg))
190
191
192
193
194

def cobalt_log_terminate():
    '''Terminate the writer thread by sending it None.

    '''
195
    cobalt_log_writer.send(None)
196

197
198
199
200
201
202
203
def str_elapsed_time(elapsed_time):
    return "%d:%02d:%02d" % (elapsed_time / 3600, elapsed_time / 60 % 60, elapsed_time % 60)

def has_private_attr(obj, attr):
    assert attr[0:2] == "__"
    return hasattr(obj, "_" + obj.__class__.__name__ + attr)

204
205
206
207
208
def has_semi_private_attr(obj, attr):
    assert attr[0:1] == "_"
    return hasattr(obj, attr)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class Signal_Map (object):
    checkpoint = 'SIGUSR1'
    terminate = 'SIGTERM'
    force_kill = 'SIGKILL'

class Signal_Info (object):
    class Reason (object):
        delete = 'delete'
        preempt = 'preempt'
        time_limit = 'time limit'

    def __init__(self, reason = None, signame = None, user = None, pending = False):
        self.__reason = reason
        self.__signal = signame
        self.__user = user
        self.__pending = pending

    def __get_reason(self):
        return self.__reason

    def __set_reason(self, reason):
        if reason in ('delete', 'preempt', 'time_limit'):
            self.__reason = reason
        else:
            raise ValueError, "illegal signal reason: %s" % (reason,)
234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
    reason = property(__get_reason, __set_reason)

    def __get_signal(self):
        return self.__signal

    def __set_signal(self, signame):
        if signame == None:
            self.__signal = None
        if not isinstance(signame, str):
            raise TypeError, "signal name must be a string"
        if signame[0:3] == "SIG" and hasattr(signal, signame):
            self.__signal = signame
        else:
            raise ValueError, "unknown signal name: %s" % (signame,)

    signal = property(__get_signal, __set_signal)

    def __get_user(self):
        return self.__user

    def __set_user(self, user):
        if isinstance(user, str) or user == None:
            self.__user = user
        else:
            raise TypeError, "user name must be a string or None"

    user = property(__get_user, __set_user)

    def __get_pending(self):
        return self.__pending

    def __set_pending(self, pending):
        if isinstance(pending, bool):
            self.__pending = pending
        else:
            raise TypeError, "pending flag must be a boolean value"

    pending = property(__get_pending, __set_pending)

274
275
276

#TODO: I want to get rid of this.  I don't want to see threaded-forks in 
# cobalt ever again. --PR
277
class RunScriptsThread(object):
278
    '''stub for restart compatibility'''
279
280
    def __init__(self):
        pass
281

282
def get_job_sm_states():
283

284
    return ['Ready',
285
        'Hold',
286
287
        'Job_Prologue',
        'Job_Prologue_Retry',
288
        'Job_Prologue_Retry_Release',
289
290
        'Resource_Prologue',
        'Resource_Prologue_Retry',
291
        'Resource_Prologue_Retry_Release',
292
293
294
295
296
297
298
299
300
301
302
303
        'Run_Retry',
        'Running',
        'Kill_Retry',
        'Killing',
        'Preempt_Retry',
        'Preempting',
        'Preempt_Finalize_Retry',
        'Preempt_Epilogue',
        'Preempted',
        'Preempted_Hold',
        'Finalize_Retry',
        'Resource_Epilogue',
304
        'Resource_Epilogue_Retry',
305
        'Job_Epilogue',
306
307
308
309
        'Job_Epilogue_Retry']

def get_job_sm_transitions():

310
    return [
311
        ('Ready', 'Hold'),                                  # user/admin hold
312
313
        ('Ready', 'Job_Prologue'),                          # run; start prologue scripts
        ('Ready', 'Job_Prologue_Retry'),                    # run; error contacting forker component; scripts not running, yet
314
315
316
        ('Ready', 'Terminal'),                              # kill
        ('Hold', 'Ready'),                                  # user/admin release, no holds
        ('Hold', 'Terminal'),                               # kill
317
318
        ('Job_Prologue', 'Resource_Prologue'),              # job_prologue scripts complete.  Starting Resource_prologue scripts
        ('Job_Prologue', 'Resource_Prologue_Retry'),        # error contacting forker component
319
        ('Job_Prologue', 'Job_Prologue_Retry'),             # Lost communication to forker during progress
320
321
322
        ('Job_Prologue', 'Job_Epilogue'),                   # kill / job prologue failed
        ('Job_Prologue', 'Job_Epilogue_Retry'),             # kill / job prologue failed.  Error communicating with forker
        ('Job_Prologue', 'Job_Prologue_Retry_Release'),     # kill / job prologue failed.  error releasing system resources
323
324
        ('Job_Prologue_Retry', 'Job_Prologue'),             # forker starting job prologue scripts
        ('Job_Prologue_Retry', 'Terminal'),                 # kill; error contacting forker component
325
326
        ('Job_Prologue_Retry_Release', 'Job_Epilogue'),     # resource successfully released
        ('Job_Prologue_Retry_Release', 'Job_Epilogue_Retry'),     # error contacting forker component
327
328
        ('Resource_Prologue', 'Run_Retry'),                 # prologue scripts complete; error contacting system component
        ('Resource_Prologue', 'Running'),                   # prologue scripts complete; system component starting task
329
        ('Resource_Prologue', 'Resource_Prologue_Retry'),   # Lost communication to forker during progress
330
331
332
        ('Resource_Prologue', 'Resource_Epilogue'),         # kill / resource prologue failed; system component released resource
        ('Resource_Prologue', 'Resource_Epilogue_Retry'),   # kill / resource prologue failed; error contacting forker component
        ('Resource_Prologue', 'Resource_Prologue_Retry_Release'), # kill / job_prologue failed.  error releasing system resources
333
334
        ('Resource_Prologue_Retry','Resource_Prologue'),    # run resource prologue scripts
        ('Resource_Prologue_Retry','Job_Epilogue'),         # kill; run any required job cleanup
335
336
        ('Resource_Prologue_Retry_Release', 'Resource_Epilogue'),       # resource successfully released
        ('Resource_Prologue_Retry_Release', 'Resource_Epilogue_Retry'), # error contacting forker component
337
338
        ('Run_Retry', 'Running'),                           # system component starting task
        ('Run_Retry', 'Resource_Epilogue'),                 # kill
339
        ('Run_Retry', 'Resource_Epilogue_Retry'),           # kill; error contacting forker component
340
341
342
343
344
345
346
        ('Running', 'Kill_Retry'),                          # kill; error contacting system component
        ('Running', 'Killing'),                             # kill; system component signaling task
        ('Running', 'Preempt_Retry'),                       # preempt; error contacting system component
        ('Running', 'Preempting'),                          # preempt; system component signaling task
        ('Running', 'Finalize_Retry'),                      # task execution complete; error finalizing task and obtaining exit
                                                            #     status
        ('Running', 'Resource_Epilogue'),                   # task execution complete; task finalized and exit status obtained
347
        ('Running', 'Resource_Epilogue_Retry'),             # task executioncomplete; error contacting forker component 
348
349
350
351
        ('Kill_Retry', 'Kill_Retry'),                       # handle multiple task signaling failures
        ('Kill_Retry', 'Killing'),                          # system component signaling task
        ('Kill_Retry', 'Finalize_Retry'),                   # task execution complete/terminated; task finalization failed
        ('Kill_Retry', 'Resource_Epilogue'),                # task execution completed/terminated successfully
352
353
        ('Kill_Retry', 'Resource_Epilogue_Retry'),          # task execution completed/terminated successfully; error contacting
                                                            #     forker component
354
355
356
357
        ('Killing', 'Kill_Retry'),                          # new signal, or kill timer expired; error contacting system component
        ('Killing', 'Killing'),                             # kill timer expired; escalating to a forced kill
        ('Killing', 'Finalize_Retry'),                      # task execution complete/terminated; task finalization failed
        ('Killing', 'Resource_Epilogue'),                   # task execution complete/terminated; task finalization successful
358
359
        ('Killing', 'Resource_Epilogue_Retry'),             # task execution complete/terminated; task finalization successful;
                                                            #   error contacting forker component
360
361
362
363
364
365
366
367
        ('Preempt_Retry', 'Kill_Retry'),                    # kill; error contacting system component
        ('Preempt_Retry', 'Killing'),                       # kill; system component signaling task
        ('Preempt_Retry', 'Preempt_Retry'),                 # hasndle multiple task signaling failures
        ('Preempt_Retry', 'Preempting'),                    # system component signaling task
        ('Preempt_Retry', 'Preempt_Finalize_Retry'),        # task execution terminated, task finalization failed
        ('Preempt_Retry', 'Preempt_Epilogue'),              # task execution terminated successfully
        ('Preempt_Retry', 'Finalize_Retry'),                # task execution completed, task finalization failed
        ('Preempt_Retry', 'Resource_Epilogue'),             # task execution completed successfully
368
        ('Preempt_Retry', 'Resource_Epilogue_Retry'),       # task execution completed; error contacting forker component
369
370
371
372
373
374
375
376
377
378
379
        ('Preempting', 'Kill_Retry'),                       # kill; new signal, error contacting system component
        ('Preempting', 'Killing'),                          # kill; new signal and system component signaling task,
                                                            #     same signal used for preempt, or attempted signal demotion
        ('Preempting', 'Preempting'),                       # preemption timer expired, escalating to the next signal level
        ('Preempting', 'Preempt_Retry'),                    # preemption timer expired, error contacting system component
        ('Preempting', 'Preempt_Finalize_Retry'),           # task execution terminated, task finalization failed
        ('Preempting', 'Preempt_Epilogue'),                 # task execution complete/terminated; task finalization successful
        ('Preempt_Finalize_Retry', 'Preempt_Epilogue'),     # task finalization failed successful
        ('Preempt_Epilogue', 'Preempted'),                  # task execution complete/terminated, no holds pending
        ('Preempt_Epilogue', 'Preempted_Hold'),             # task execution complete/terminated, holds pending
        ('Preempt_Epilogue', 'Job_Epilogue'),               # task execution complete/terminated, kill pending
380
        ('Preempted', 'Resource_Prologue'),                 # run
381
382
383
384
385
        ('Preempted', 'Preempted_Hold'),                    # user/admin hold
        ('Preempted', 'Job_Epilogue'),                      # kill
        ('Preempted_Hold', 'Preempted'),                    # user/admin release, no holds
        ('Preempted_Hold', 'Job_Epilogue'),                 # kill
        ('Finalize_Retry', 'Resource_Epilogue'),            # task finalized and exit status obtained (if applicable)
386
        ('Finalize_Retry', 'Resource_Epilogue_Retry'),      # task finalized, exit status obtained, error contacting forker component
387
        ('Resource_Epilogue', 'Job_Epilogue'),              # resource epilogue scripts complete
388
389
        ('Resource_Epilogue', 'Resource_Epilogue_Retry'),   # Lost communication to forker during progress
        ('Resource_Epilogue', 'Job_Epilogue_Retry'),        # resource epilogue scripts complete; error contacting forker component
390
        ('Resource_Epilogue_Retry', 'Resource_Epilogue'),   # starting resource epilogue scripts
391
        ('Job_Epilogue', 'Terminal'),                       # job epilogue scripts complete
392
        ('Job_Epilogue_Retry', 'Job_Epilogue'),             # starting job_epilogue scripts
393
        ('Job_Epilogue', 'Job_Epilogue_Retry')              # Lost communication to forker during progress
394
        ]
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422

def get_job_sm_initial_state():

    return "Ready"

def get_job_sm_events():

    return ['Run', 'Hold', 'Release', 'Preempt', 'Kill', 'Task_End']

def get_job_sm_seas(job):

    return {('Ready', 'Run') : [job._sm_ready__run],
            ('Ready', 'Hold') : [job._sm_ready__hold],
            ('Ready', 'Release') : [job._sm_ready__release],
            ('Ready', 'Kill') : [job._sm_ready__kill],
            ('Hold', 'Hold') : [job._sm_hold__hold],
            ('Hold', 'Release') : [job._sm_hold__release],
            ('Hold', 'Kill') : [job._sm_hold__kill],
            ('Job_Prologue', 'Progress') : [job._sm_job_prologue__progress],
            ('Job_Prologue', 'Hold') : [job._sm_common__pending_hold], 
            ('Job_Prologue', 'Release') : [job._sm_common__pending_release],
            ('Job_Prologue', 'Preempt') : [job._sm_common__pending_preempt], #custom?
            ('Job_Prologue', 'Kill') : [job._sm_common__pending_kill],
            ('Job_Prologue_Retry', 'Progress') : [job._sm_job_prologue_retry__progress],
            ('Job_Prologue_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Job_Prologue_Retry', 'Release') : [job._sm_common__pending_release],
            ('Job_Prologue_Retry', 'Preempt') : [job._sm_common__pending_preempt], #Must go pending
            ('Job_Prologue_Retry', 'Kill') : [job._sm_job_prologue_retry__kill],
423
424
425
426
427
            ('Job_Prologue_Retry_Release', 'Progress') : [job._sm_job_prologue_retry_release__progress],
            ('Job_Prologue_Retry_Release', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Prologue_Retry_Release', 'Release') : [job._sm_exit_common__release],
            ('Job_Prologue_Retry_Release', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Prologue_Retry_Release', 'Kill') : [job._sm_exit_common__kill],
428
429
430
431
432
            ('Resource_Prologue', 'Progress') : [job._sm_resource_prologue__progress],
            ('Resource_Prologue', 'Hold') : [job._sm_common__pending_hold],
            ('Resource_Prologue', 'Release') : [job._sm_common__pending_release],
            ('Resource_Prologue', 'Preempt') : [job._sm_common__pending_preempt], #custom?
            ('Resource_Prologue', 'Kill') : [job._sm_common__pending_kill],
433
            ('Resource_Prologue_Retry', 'Progress') : [job._sm_resource_prologue_retry__progress],
434
435
436
            ('Resource_Prologue_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Resource_Prologue_Retry', 'Release') : [job._sm_common__pending_release],
            ('Resource_Prologue_Retry', 'Preempt') : [job._sm_common__pending_preempt], #custom: go directly to preempted?
437
438
439
440
441
442
            ('Resource_Prologue_Retry', 'Kill') : [job._sm_resource_prologue_retry__kill],
            ('Resource_Prologue_Retry_Release', 'Progress') : [job._sm_resource_prologue_retry_release__progress],
            ('Resource_Prologue_Retry_Release', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Prologue_Retry_Release', 'Release') : [job._sm_exit_common__release],
            ('Resource_Prologue_Retry_Release', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Prologue_Retry_Release', 'Kill') : [job._sm_exit_common__kill],
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
            ('Run_Retry', 'Progress') : [job._sm_run_retry__progress],
            ('Run_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Run_Retry', 'Release') : [job._sm_common__pending_release],
            ('Run_Retry', 'Preempt') : [job._sm_common__pending_preempt],
            ('Run_Retry', 'Kill') : [job._sm_run_retry__kill],
            ('Running', 'Progress') : [job._sm_running__progress],
            ('Running', 'Hold') : [job._sm_common__pending_hold],
            ('Running', 'Release') : [job._sm_common__pending_release],
            ('Running', 'Preempt') : [job._sm_common__pending_preempt],
            ('Running', 'Kill') : [job._sm_running__kill],
            ('Running', 'Task_End') : [job._sm_running__task_end],
            ('Kill_Retry', 'Progress') : [job._sm_kill_retry__progress],
            ('Kill_Retry', 'Hold') : [job._sm_kill_common__hold],
            ('Kill_Retry', 'Release') : [job._sm_kill_common__release],
            ('Kill_Retry', 'Preempt') : [job._sm_kill_common__preempt],
            ('Kill_Retry', 'Kill') : [job._sm_kill_retry__kill],
            ('Kill_Retry', 'Task_End') : [job._sm_kill_retry__task_end],
            ('Killing', 'Progress') : [job._sm_killing__progress],
            ('Killing', 'Hold') : [job._sm_kill_common__hold],
            ('Killing', 'Release') : [job._sm_kill_common__release],
            ('Killing', 'Preempt') : [job._sm_kill_common__preempt],
            ('Killing', 'Kill') : [job._sm_killing__kill],
            ('Killing', 'Task_End') : [job._sm_killing__task_end],
            ('Preempt_Retry', 'Progress') : [job._sm_preempt_retry__progress],
            ('Preempt_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Retry', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Retry', 'Kill') : [job._sm_preempt_retry__kill],
            ('Preempt_Retry', 'Task_End') : [job._sm_preempt_retry__task_end],
            ('Preempting', 'Progress') : [job._sm_preempting__progress],
            ('Preempting', 'Hold') : [job._sm_common__pending_hold],
            ('Preempting', 'Release') : [job._sm_common__pending_release],
            ('Preempting', 'Kill') : [job._sm_preempting__kill],
            ('Preempting', 'Task_End') : [job._sm_preempting__task_end],
            ('Preempt_Finalize_Retry', 'Progress') : [job._sm_preempt_finalize_retry__progress],
            ('Preempt_Finalize_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Finalize_Retry', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Finalize_Retry', 'Kill') : [job._sm_common__pending_kill],
            ('Preempt_Epilogue', 'Progress') : [job._sm_preempt_epilogue__progress],
            ('Preempt_Epilogue', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Epilogue', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Epilogue', 'Kill') : [job._sm_common__pending_kill],
            ('Preempted', 'Run') : [job._sm_preempted__run],
            ('Preempted', 'Hold') : [job._sm_preempted__hold],
            ('Preempted', 'Release') : [job._sm_preempted__release],
            ('Preempted', 'Kill') : [job._sm_preempted__kill],
            ('Preempted_Hold', 'Hold') : [job._sm_preempted_hold__hold],
            ('Preempted_Hold', 'Release') : [job._sm_preempted_hold__release],
            ('Preempted_Hold', 'Kill') : [job._sm_preempted_hold__kill],
            ('Finalize_Retry', 'Progress') : [job._sm_finalize_retry__progress],
            ('Finalize_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Finalize_Retry', 'Release') : [job._sm_exit_common__release],
            ('Finalize_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Finalize_Retry', 'Kill') : [job._sm_exit_common__kill],
            ('Resource_Epilogue', 'Progress') : [job._sm_resource_epilogue__progress],
            ('Resource_Epilogue', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Epilogue', 'Release') : [job._sm_exit_common__release],
            ('Resource_Epilogue', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Epilogue', 'Kill') : [job._sm_exit_common__kill],
            ('Resource_Epilogue_Retry', 'Progress') : [job._sm_resource_epilogue_retry__progress],
            ('Resource_Epilogue_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Epilogue_Retry', 'Release') : [job._sm_exit_common__release],
            ('Resource_Epilogue_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Epilogue_Retry', 'Kill') : [job._sm_exit_common__kill],
            ('Job_Epilogue', 'Progress') : [job._sm_job_epilogue__progress],
            ('Job_Epilogue', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Epilogue', 'Release') : [job._sm_exit_common__release],
            ('Job_Epilogue', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Epilogue', 'Kill') : [job._sm_exit_common__kill],
            ('Job_Epilogue_Retry', 'Progress') : [job._sm_job_epilogue_retry__progress],
            ('Job_Epilogue_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Epilogue_Retry', 'Release') : [job._sm_exit_common__release],
            ('Job_Epilogue_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Epilogue_Retry', 'Kill') : [job._sm_exit_common__kill],
            }

class Job (StateMachine):
    """
    The job tracks a job, driving it at a high-level.  Actual operations on the job, such as execution or termination, are
    the responsibility of the system component
    """
523

524
    acctlog = Cobalt.Util.AccountingLog('qm')
525

526
527
528
529
530
531
532
    # properties for easier accounting logging
    ctime = property(lambda self: self.__timers['queue'].start_times[0])
    qtime = property(lambda self:
        self.__timers['current_queue'].start_times[0])
    start = property(lambda self: self.__timers['user'].start_times[-1])
    exec_host = property(lambda self: ":".join(self.__locations[-1]))
    end = property(lambda self: self.__timers['user'].stop_times[-1])
533

534
535
536
537
538
539
    fields = Data.fields + [
        "jobid", "jobname", "state", "attribute", "location", "starttime", 
        "submittime", "endtime", "queue", "type", "user",
        "walltime", "procs", "nodes", "mode", "cwd", "command", "args", 
        "outputdir", "project", "lienID", "stagein", "stageout",
        "reservation", "host", "port", "url", "stageid", "envs", "inputfile", 
540
        "kernel", "kerneloptions", "ion_kernel", "ion_kerneloptions", "admin_hold",
541
        "user_hold", "dependencies", "notify", "adminemail", "outputpath",
542
        "errorpath", "cobalt_log_file", "path", "preemptable", "preempts",
543
544
545
        "mintasktime", "maxtasktime", "maxcptime", "force_kill_delay", 
        "is_runnable", "is_active",
        "has_completed", "sm_state", "score", "attrs", "has_resources", 
546
547
        "exit_status", "dep_frac", "walltime_p", "user_list", "runid",
        "geometry"
548
549
550
    ]

    _states = get_job_sm_states() + StateMachine._states
551

552
    _transitions = get_job_sm_transitions()
553

554
555
    _initial_state = get_job_sm_initial_state()
    _events = get_job_sm_events() + StateMachine._events
556
557
558
559
560
561
562
563

    # return codes to improve typo detection.  by using the return codes in condition statements, a typo will result in a key
    # error rather than an incorrectly followed path.
    __rc_success = "success"
    __rc_retry = "retry"
    __rc_pg_create = "pg_create"
    __rc_xmlrpc = "xmlrpc"
    __rc_unknown = "unknown"
564

565
    def __init__(self, spec):
566
        self.initializing = True
567
        seas = get_job_sm_seas(self)
568

569
        # StateMachine.__init__(self, spec, seas = seas, terminal_actions = [(self._sm_terminal, {})])
570
        StateMachine.__init__(self, spec, seas = seas)
571

George Rojas's avatar
George Rojas committed
572
573
        logger.info(str(spec))

574
        self.jobid = spec.get("jobid")
575
        self.umask = spec.get("umask")
576
577
578
579
580
581
582
        self.jobname = spec.get("jobname", "N/A")
        self.attribute = spec.get("attribute", "compute")
        self.starttime = spec.get("starttime", "-1")
        self.submittime = spec.get("submittime", time.time())
        self.endtime = spec.get("endtime", "-1")
        self.type = spec.get("type", "mpish")
        self.user = spec.get("user")
583
        self.__walltime = int(float(spec.get("walltime", 0)))
584
        self.walltime_p = spec.get("walltime_p", self.walltime)    #  *AdjEst*
585
586
587
588
        self.procs = spec.get("procs")
        self.nodes = spec.get("nodes")
        self.cwd = spec.get("cwd")
        self.command = spec.get("command")
589
        self.args = spec.get("args", [])
590
591
        self.project = spec.get("project")
        self.lienID = spec.get("lienID")
592
593
        self.stagein = spec.get("stagein") #does nothing
        self.stageout = spec.get("stageout") #like ze goggles, it does nothing
594
        self.reservation = spec.get("reservation", False) #appears to be defunct.
595
596
597
598
599
        self.host = spec.get("host")
        self.port = spec.get("port")
        self.url = spec.get("url")
        self.stageid = spec.get("stageid")
        self.inputfile = spec.get("inputfile")
600
        self.tag = spec.get("tag", "job")
601
        self.kernel = spec.get("kernel", get_config_option('bgsystem', 'cn_default_kernel', 'default'))
602
        self.kerneloptions = spec.get("kerneloptions")
Paul Rich's avatar
Paul Rich committed
603
604
        self.ion_kernel = spec.get("ion_kernel",  get_config_option('bgsystem', 'ion_default_kernel', 'default'))
        self.ion_kerneloptions = spec.get("ion_kerneloptions")
605
606
607
        self.notify = spec.get("notify")
        self.adminemail = spec.get("adminemail")
        self.location = spec.get("location")
608
        self.__locations = []
609
        self.outputpath = spec.get("outputpath")
George Rojas's avatar
George Rojas committed
610
        if self.outputpath and self.jobname == 'N/A':
611
612
613
614
615
            jname = self.outputpath.split('/')[-1].split('.output')[0]
            if jname and jname != str(self.jobid):
                self.jobname = jname
        self.outputdir = spec.get("outputdir")
        self.errorpath = spec.get("errorpath")
616
        self.cobalt_log_file = spec.get("cobalt_log_file")
617
        if not self.cobalt_log_file:
618
            self.cobalt_log_file = "%s/%s.cobaltlog" % (self.outputdir, self.jobid)
619
620
621
        else:
            t = string.Template(self.cobalt_log_file)
            self.cobalt_log_file = t.safe_substitute(jobid=self.jobid)
622
623
        self.path = spec.get("path")
        self.mode = spec.get("mode", "co")
624
        self.envs = spec.get("envs", {})
Paul Rich's avatar
Paul Rich committed
625
        self.force_kill_delay = spec.get("force_kill_delay",
626
                get_cqm_config('force_kill_delay', DEFAULT_FORCE_KILL_DELAY))
627
        self.attrs = spec.get("attrs", {})
628

629
        self.score = float(spec.get("score", 0.0))
630
631
632
633
634
635
636
637

        self.__resource_nodects = []
        self.__timers = dict(
            queue = Timer(),
            current_queue = Timer(),
            user = Timer(),
        )

638
        # setting the queue will cause updated accounting records to be
639
        #written and the current queue timer to be restarted, so
640
641
        # this needs to be done only after the object has been initialized
        self.queue = spec.get("queue", "default")
642
        self.resid = None #Must be obtained from the scheduler component.
643
644
645
        self.__timers['queue'].start()
        self.etime = time.time()

646
647
648
649
        self.__admin_hold = False
        self.__user_hold = False
        self.__dep_hold = False

650
651
        # setting the hold flags will automatically cause the appropriate hold
        #events to be triggered, so this needs to be done
652
653
654
655
656
        # only after the object has been completely initialized
        if spec.get("admin_hold", False):
            self.admin_hold = True
        if spec.get("user_hold", False):
            self.user_hold = True
657

658
        self.dep_fail = False
Paul Rich's avatar
Paul Rich committed
659
        self.dep_frac = None #float(get_cqm_config('dep_frac', 0.5))
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
        self.all_dependencies = spec.get("all_dependencies")
        self.satisfied_dependencies = []
        if self.all_dependencies:
            self.all_dependencies = str(self.all_dependencies).split(":")
            logger.info("Job %s/%s: dependencies set to %s", self.jobid, self.user, ":".join(self.all_dependencies))
            # read passed dependencies and set dep_hold if necessary
            # set dep_hold as needed:
            self.update_dep_state()
        else:
            self.all_dependencies = []


        self.preemptable = spec.get("preemptable", False)
        self.__preempts = 0
        if self.preemptable:
            self.mintasktime = int(float(spec.get("mintasktime", 0)))
            self.maxtasktime = int(float(spec.get("maxtasktime", 0)))
            self.maxcptime = int(float(spec.get("maxcptime", 0)))

        self.geometry = spec.get("geometry", None)

        self.taskid = None
        self.task_running = False
        self.exit_status = None
        self.max_running = False

        self.total_etime = 0.0
        self.priority_core_hours = None
688
        self.user_list = spec.get('user_list', [self.user])
689

690
691
692
693
694
695
        #for imporved script handling:
        self.job_prescript_ids = []
        self.job_postscript_ids = []
        self.resource_prescript_ids = []
        self.resource_postscript_ids = []

696
        self.runid = None #A job starts once.
697
698
699
700
        #prebooting is assumed for script jobs, unless the user requests that it be turned off
        #This is here for BG/Q ensemble job support.  --PMR
        self.script_preboot = spec.get("script_preboot", True) #preboot a block for a script.

701

702
        dbwriter.log_to_db(self.user, "creating", "job_data", JobDataMsg(self))
703
        if self.admin_hold:
704
            dbwriter.log_to_db(self.user, "admin_hold", "job_prog", JobProgMsg(self))
705
        if self.user_hold:
706
707
708
            dbwriter.log_to_db(self.user, "user_hold", "job_prog", JobProgMsg(self))


709
        self.initializing = False
710

711
712
    # end def __init__()

713
    def no_holds_left(self):
Paul Rich's avatar
Paul Rich committed
714
        '''Check for whether any holds are set on a job'''
715
716
        return not (self.admin_hold or 
                self.user_hold or 
717
                self.dep_hold or 
718
719
                self.max_running)

720
721
722
    def __getstate__(self):
        data = {}
        for key, value in self.__dict__.iteritems():
723
            if key not in ['log', 'comms', 'acctlog']:
724
725
726
727
                data[key] = value
        return data

    def __setstate__(self, state):
728
        self.__dict__.update(state)
729

730
731
732
        #reset the statemachine's states
        self.update_seas(get_job_sm_seas(self))

733
734
735
736
        # BRT: why is the current queue timer being reset?  if cqm is 
        #restarted, the job remained in the queue during that time, so I would
        #think that timer should continue to run during the restart rather 
        #than been reset.
737
738
739
        if not self.__timers.has_key('current_queue'):
            self.__timers['current_queue'] = Timer()
            self.__timers['current_queue'].start()
740
741
742

        # special case to handle missing data from old state files
        if not state.has_key("total_etime"):
Daniel Buettner's avatar
Daniel Buettner committed
743
            logger.info("old job missing total_etime")
744
            self.total_etime = 0.0
745

746
        if not state.has_key("priority_core_hours"):
Daniel Buettner's avatar
Daniel Buettner committed
747
            logger.info("old job missing priority_core_hours")
748
            self.priority_core_hours = None
749

750
751
752
        if not state.has_key("dep_fail"):
            logger.info("old job missing dep_fail")
            self.dep_fail = False
753

754
755
756
        if not state.has_key("dep_frac"):
            logger.info("old job missing dep_frac")
            self.dep_frac = None
757
758

        if not state.has_key("user_list"):
759
            logger.info("old job missing user_list")
760
            self.user_list = [self.user]
761

762
763
        if not state.has_key("walltime_p"):
            logger.info("old job missing walltime_p")
764
765
766
767
768
            self.walltime_p = self.walltime

        if not state.has_key("resid"):
            logger.info("old job missing resid")
            self.resid = None
769

770
771
772
773
774
775
776
777
        if not state.has_key("job_prologue_ids"):
            self.job_prologue_ids = None
        if not state.has_key("job_epilogue_ids"):
            self.job_epilogue_ids = None
        if not state.has_key("resource_prologue_ids"):
            self.resource_prologue_ids = None
        if not state.has_key("resource_epilogue_ids"):
            self.resource_epilogue_ids = None
778
779
        if not state.has_key("geometry"):
            self.geometry = None
780
781
        if not state.has_key("script_preboot"):
            self.script_preboot = True
782
        if not state.has_key('ion_kernel'):
783
            #if ion kernel doesn't exist, neither will ion_kerneloptions
784
            self.ion_kernel = get_config_option('bgsystem', 'ion_default_kernel', 'default')
785
786
787
            self.ion_kerneloptions = get_config_option('bgsystem', 'ion_default_kernel_options', False)
            if self.ion_kerneloptions == False:
                self.ion_kerneloptions = None
788
        self.runid = state.get("runid", None)
789
790
791
        #for old statefiles, make sure to update the dependency state on restart:
        self.__dep_hold = False
        self.update_dep_state()
792
793
        self.initializing = False

794
795
    def __task_signal(self, retry = True):
        '''send a signal to the managed task'''
796
797
        # BRT: this routine should probably check if the task could not be 
        #signaled because it was no longer running
798
        try:
799
            self._sm_log_info("instructing the system component to send signal %s" % (self.__signaling_info.signal,))
800
801
802
            pgroup = ComponentProxy("system").signal_process_groups([{'id':self.taskid}], self.__signaling_info.signal)
        except (ComponentLookupError, xmlrpclib.Fault), e:
            #
803
804
            # BRT: will a ComponentLookupError ever be raised directly or will 
            # it always be buried in a XML-RPC fault?
805
            #
806
807
808
809
            # BRT: shouldn't we be checking the XML-RPC fault code?  which 
            # fault codes are valid for this operation?  at the very least 
            # unexpected fault code should be reported as such and the retry 
            # loop broken.
810
811
            #
            if retry:
812
                self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
813
                return Job.__rc_retry
814
            else:
815
816
                self._sm_log_warn("failed to communicate with the system component (%s); manual cleanup may be required" % \
                    (e,))
817
818
                return Job.__rc_xmlrpc
        except:
819
            self._sm_raise_exception("unexpected error from the system component; manual cleanup may be required")
820
821
822
823
824
825
826
            return Job.__rc_unknown

        self.__signaled_info = self.__signaling_info
        del self.__signaling_info
        return Job.__rc_success

    def __task_run(self):
827
        global run_id_gen
828
        walltime = self.walltime
829
830
        if self.runid == None: #allow us to unset this for preemption
            self.runid = run_id_gen.next() #Don't try and run this twice.
831

832
833
834
        if self.preemptable and self.maxtasktime < walltime:
            walltime = self.maxtasktime
        try:
835
            self._sm_log_info("instructing the system component to begin executing the task")
836
            pgroup = ComponentProxy("system", retry=False).add_process_groups([{
837
838
839
840
841
842
843
844
845
846
                'id':"*",
                'jobid':self.jobid,
                'user':self.user,
                'stdin':self.inputfile,
                'stdout':self.outputpath,
                'stderr':self.errorpath,
                'cobalt_log_file':self.cobalt_log_file,
                'size':self.procs,
                'nodect':"*",
                'mode':self.mode,
847
                'script_preboot':self.script_preboot,
848
849
850
851
852
853
854
855
                'cwd':self.outputdir,
                'executable':self.command,
                'args':self.args,
                'env':self.envs,
                'location':self.location,
                'umask':self.umask,
                'kernel':self.kernel,
                'kerneloptions':self.kerneloptions,
856
857
                'ion_kernel':self.ion_kernel,
                'ion_kerneloptions':self.ion_kerneloptions,
858
                'starttime':self.starttime,
859
                'walltime':walltime,
860
                'killtime':self.force_kill_delay + 1,
861
                'resid': self.resid,
862
863
                'runid': self.runid,
                'attrs': self.attrs,
864
865
866
867
868
869
870
871
            }])
            if pgroup[0].has_key('id'):
                self.taskid = pgroup[0]['id']
                if pgroup[0].has_key('nodect') and pgroup[0]['nodect'] != None:
                    self.__resource_nodects.append(pgroup[0]['nodect'])
                else:
                    self.__resource_nodects.append(self.nodes)
            else:
872
873
                self._sm_log_error("process group creation failed", 
                        cobalt_log = True)
874
875
                return Job.__rc_pg_create
        except (ComponentLookupError, xmlrpclib.Fault), e:
876
            self._sm_log_warn("failed to execute the task (%s); retry pending" % (e,))
877
878
            return Job.__rc_retry
        except:
879
            self._sm_raise_exception("unexpected error returned from the system component when attempting to add task",
880
                cobalt_log = True)
881
            return Job.__rc_unknown
882

883
        return Job.__rc_success
884

885
886
887
888
889
890
    def __task_finalize(self):
        '''get exit code from system component'''
        try:
            result = ComponentProxy("system").wait_process_groups([{'id':self.taskid, 'exit_status':'*'}])
            if result:
                self.exit_status = result[0].get('exit_status')
891
892
893
894
895
                dbwriter.log_to_db(None, "exit_status_update", "job_prog", 
                    JobProgExitStatusMsg(self))



896
            else:
897
                self._sm_log_warn("system component was unable to locate the task; exit status not obtained")
898
        except (ComponentLookupError, xmlrpclib.Fault), e:
899
            self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
900
901
            return Job.__rc_retry
        except:
902
            self._sm_raise_exception("unexpected error returned from the system component while finalizing task")
903
            return Job.__rc_unknown
904

905
906
        self.taskid = None
        return Job.__rc_success
907

908
    def __release_resources(self):
909
        '''release computing resources that may still be reserved by the job'''
910
        try:
911
            ComponentProxy("system").reserve_resources_until(self.location, None, self.jobid)
912
913
            return Job.__rc_success
        except (ComponentLookupError, xmlrpclib.Fault), e:
914
            self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
915
916
            return Job.__rc_retry
        except:
917
            self._sm_raise_exception("unexpected error returned from the system component while releasing resources")
918
            return Job.__rc_unknown
919

920
    def _sm_log_debug(self, msg, cobalt_log = False):
921
        '''write an informational message to the CQM log that includes state machine status'''
922
923
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
924
925
        else:
            event_msg = ""
926
        logger.debug("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
927
928
929
        if cobalt_log:
            self.__write_cobalt_log("Debug: %s" % (msg,))

930
    def _sm_log_info(self, msg, cobalt_log = False):
931
        '''write an informational message to the CQM log that includes state machine status'''
932
933
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
934
935
        else:
            event_msg = ""
936
        logger.info("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
937
938
939
        if cobalt_log:
            self.__write_cobalt_log("Info: %s" % (msg,))

940
    def _sm_log_warn(self, msg, cobalt_log = False):
941
        '''write a warning message to the CQM log that includes state machine status'''
942
943
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
944
945
        else:
            event_msg = ""
946
        logger.warning("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
947
948
949
        if cobalt_log:
            self.__write_cobalt_log("Warning: %s" % (msg,))

950
951
    def _sm_log_error(self, msg, tb_flag = True, skip_tb_entries = 1, cobalt_log = False):
        '''write an error message to the CQM log that includes state machine status and a stack trace'''
952
953
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
954
955
        else:
            event_msg = ""
956
        full_msg = "Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg)
957
958
959
960
961
962
963
964
965
        if tb_flag:
            stack = traceback.format_stack()
            last_tb_entry = len(stack) - skip_tb_entries
            for entry in stack[:last_tb_entry]:
                full_msg += "\n    " + entry[:-1]
        logger.error(full_msg)
        if cobalt_log:
            self.__write_cobalt_log("ERROR: %s" % (msg,))

966
    def _sm_log_exception(self, msg, cobalt_log = False):
967
        '''write an error message to the CQM log that includes state machine status and a stack trace'''
968
969
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
970
971
        else:
            event_msg = ""
972
        full_msg = "Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg)
973
974
975
976
977
978
979
980
981
982
        (exc_cls, exc, tb) = sys.exc_info()
        exc_str = traceback.format_exception_only(exc_cls, exc)[0]
        full_msg += "\n    Exception: %s" % (exc_str)
        stack = traceback.format_tb(tb)
        for entry in stack:
            full_msg += "\n    " + entry[:-1]
        logger.error(full_msg)
        if cobalt_log:
            self.__write_cobalt_log("EXCEPTION: %s" % (full_msg,))

983
984
    def _sm_raise_exception(self, msg, cobalt_log = False):
        self._sm_log_error(msg, skip_tb_entries = 2, cobalt_log = cobalt_log)
985
        raise JobProcessingError(msg, self.jobid, self.user, self.state, self._sm_state, self._sm_event)
986

987
    def _sm_log_user_delete(self, signame, user = None, pending = False):
988
989
990
991
992
993
994
995
        if user != None:
            umsg = " by user %s" % (user,)
        else:
            umsg = ""
        if pending == True:
            pmsg = " now pending"
        else:
            pmsg = ""
996
        self._sm_log_info("user delete requested with signal %s%s%s" % (signame, umsg, pmsg), cobalt_log = True)
997

998
999
    def _sm_signaling_info_set_user_delete(self, signame = Signal_Map.terminate, user = None, pending = False):
        self.__signaling_info = Signal_Info(Signal_Info.Reason.delete, signame, user, pending)
1000
        self._sm_log_user_delete(signame, user, pending)
1001

1002
    def _sm_check_job_timers(self):
1003
1004
        if self.__max_job_timer.has_expired:
            # if the job execution time has exceeded the wallclock time, then inform the task that it must terminate
George Rojas's avatar
George Rojas committed
1005
            self._sm_log_info("maximum execution time exceeded; initiating job termination", cobalt_log = True)
1006
            accounting_logger.info(accounting.abort(self.jobid))
1007
            return Signal_Info(Signal_Info.Reason.time_limit, Signal_Map.terminate)
1008
1009
1010
        else:
            return None

1011
    def _sm_kill_task(self):
1012
1013
        '''initiate the user deletion of a task by signaling it and then changing to the appropriate state'''
        # BRT: this routine should probably check if the task could not be signaled because it was no longer running
1014
1015
        rc = self.__task_signal()
        if rc == Job.__rc_success:
1016
            # start the signal timer so that the state machine knows when to escalate to sending a force kill signal
1017
            if self.__signaled_info.signal != Signal_Map.force_kill:
1018
                self._sm_log_debug("setting force kill signal timer to %d seconds" % (self.force_kill_delay * 60,))
1019
1020
1021
1022
                self.__signal_timer = Timer(self.force_kill_delay * 60)
            else:
                self.__signal_timer = Timer()
            self.__signal_timer.start()
1023
            self._sm_state = 'Killing'
1024
1025
            return True
        else:
1026
            self._sm_state = 'Kill_Retry'
1027
            return False
1028

1029
    def _sm_check_preempt_timers(self):
1030
1031
1032
        '''
        if maximum resource timer has expired, or a preemption is pending and the minimum resource time has been exceeded, then
        inform the task it's time to checkpoint and terminate
1033
1034
        '''
        if self.__maxtasktimer.has_expired:
1035
            self._sm_log_info("maximum resource time exceeded; initiating job preemption")
1036
            if self.maxcptime > 0:
1037
                return Signal_Info(Signal_Info.Reason.preempt, Signal_Map.checkpoint)
1038
            else:
1039
1040
1041
1042
                return Signal_Info(Signal_Info.Reason.preempt, Signal_Map.terminate)
        elif has_private_attr(self, '__signaling_info') and self.__signaling_info.reason == Signal_Info.Reason.preempt and \
                self.__signaling_info.pending and self.__mintasktimer.has_expired:
            self._sm_log_info("preemption pending and resource time exceeded; initiating job preemption")
1043