cqm.py 193 KB
Newer Older
1
#!/usr/bin/env python
2
# $Id$
3
4
5

'''Cobalt Queue Manager'''

6
7
8
#
# TODO:
#
9
# - modify progress routine to catch exceptions and report them using _sm_log_exception.  should some or all exceptions cause
10
11
#   the job to be terminated?
#
12
# - make a pass through the _sm_log calls and decide the correct error levels
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#
# Changes:
#
# - (Job) the system_state and user_state fields no longer exist.
#
# - (Job) the admin_hold and user_hold fields are now set to True or False to respective hold or release a job.
#
# - (Job) setting the preemptable field to true specifies that a task associate with the job may be preempted.
#
# - (Job) the is_runnable field is true when the job is able to run (ready or preempted states) and false otherwise.
#
# - (Job) the is_active field is set to true when the job is in any state other than ready (queued), held, preempted or completed
#   (terminal).
#
# - (Job) the has_resources field is set to true if resources are assigned to the job.  this assumes that resources may not be
#   deallocated until after the resource epilogue scripts have been run and thus remains true until those scripts have completed.
#   in reality, the system component may reacquire control of the resources and possibly reassign them as soon as the task
#   terminates.  this may occur even before cqm is notices that the task has ended.  unfortunately, this is the best we can do
#   without acquiring resource state from the system component.
#
# - (Job) the has_completed field is set to true only once the job has finished (reached the terminal state).  a value of true
#   does not imply that the job completed successfully, only that it is not active and may never be run again.
#
# - (Job) the preempts field contains a count of the number of times a task associated with the job has been
#   preempted.  this field is read-only and only exists if the job is preemptable.
#
# - (Job) the maxcptime field specifies the time needed for a task to checkpoint.  if maxcptime is greater than zero, the task
#   will first be signaled to checkpoint (SIGUSR1).  after the specified amount of time elapses, the task will then be signaled
#   to terminate (SIGTERM).  if maxcptime is zero, the task will immediately be signaled to terminate.  (BRT: is this the right
#   thing to do?  how will the job know the difference between being preempted and killed?  is it necessary for it to know the
#   difference no checkpoint time is requested?)
#
# - (Job) the mintasktime field specifies the minimum amount of time a task must run before it may be preempted.  any
#   preemption requests made prior to the specified amount of time elapsing will be recorded and processed once the specified
#   amount of time has elapsed.  notification to checkpoint will be made prior to preemption as indicated by the maxcptime.
#
# - (Job) the maxtasktime field specifies the maximum amount of time a task will be allowed to run before being preempted.
#   after the specified amount of time has elasped the task will be signaled to terminate (SIGTERM).  notification to checkpoint
#   will be made prior to preemption as indicated by maxcptime.
#
# - (Job) setting the walltime field is no longer required (at least by the cqm component).  instead, the maxtasktime field
#   may be used to specify repeated timeslices to be given to the job until it terminates.  (BRT: this is a first stab at
#   timeslicing.  eventually, we may want the scheduler component to have some say in whether a job gets preempted after the
#   maximum amount of time has been reached, allowing a job to continue to run past it's specified timeslice if no resource
#   contention exists.)
#
# - (Job) the 'force_kill_delay' field specifies the time that must elapse before a task being preempted or killed should be
#   forcibly terminated (with SIGKILL).  if the field is not set in the job instance, the value of 'force_kill_delay' in the cqm
#   section of the config file will be used.  if 'force_kill_delay' is not found in the cqm section of the config file, then the
#   module attribute DEFAULT_FORCE_KILL_DELAY is used (see below).
#
# Assumptions:
#
# - (Job/CQM) the system component will return a non-zero exit status if the task was killed by a signal.  'None' is considered
#   to be non-zero and thus would be a valid exit status if the task was terminated.
#


import errno
72
73
import logging
import os
74
import pwd
75
import grp
76
77
import sys
import time
78
79
import math
import types
80
81
import xmlrpclib
import ConfigParser
82
83
84
85
import signal
import thread
from threading import Thread, Lock
import traceback
86
import string
87

88
89
90

import Cobalt
import Cobalt.Util
91
from Cobalt.Util import Timer, pickle_data, unpickle_data, disk_writer_thread
92
import Cobalt.Cqparse
93
from Cobalt.Data import Data, DataList, DataDict, IncrID
94
from Cobalt.StateMachine import StateMachine
95
from Cobalt.Components.base import Component, exposed, automatic, query, locking
Daniel Buettner's avatar
Daniel Buettner committed
96
from Cobalt.Proxy import ComponentProxy
97
98
from Cobalt.Exceptions import (QueueError, ComponentLookupError, DataStateError, DataStateTransitionError, StateMachineError,
    StateMachineIllegalEventError, StateMachineNonexistentEventError, ThreadPickledAliveException, JobProcessingError,
99
    JobRunError, JobPreemptionError, JobDeleteError, IncrIDError, ResourceReservationFailure)
100
from Cobalt import accounting
Daniel Buettner's avatar
Daniel Buettner committed
101
from Cobalt.Statistics import Statistics
102
103
from Cobalt.Util import get_config_option, init_cobalt_config

104
__revision__ = '$Revision$'
105
init_cobalt_config()
106

107
DEFAULT_FORCE_KILL_DELAY = 5  # (in minutes)
108
109
CLOB_SIZE = 4096

110
logger = logging.getLogger(__name__.split('.')[-1])
111

Daniel Buettner's avatar
Daniel Buettner committed
112
cqm_id_gen = None
113
run_id_gen = None #IncrID()
114

115
116
117
118
119
cqm_forker_tag = "cqm_script"
job_prescript_tag = "job prescript"
job_postscript_tag = "job postscript"
resource_postscript_tag = "resource postscript"

120
121
122
123
124
125
126
127
128
129
130
131
132
config = ConfigParser.ConfigParser()
config.read(Cobalt.CONFIG_FILES)
if not config.has_section('cqm'):
    print '''"cqm" section missing from cobalt config file'''
    sys.exit(1)

def get_cqm_config(option, default):
    try:
        value = config.get('cqm', option)
    except ConfigParser.NoOptionError:
        value = default
    return value

133
134
135
136
137
138
139
def get_bgsched_config(option, default):
    try:
        value = config.get('bgsched', option)
    except ConfigParser.NoOptionError:
        value = default
    return value

140
141
142
143
144
145
146
147
# *AdjEst*
def get_histm_config(option, default):
    try:
        value = config.get('histm', option)
    except ConfigParser.NoSectionError:
        value = default
    return value

148
CQM_SCALE_DEP_FRAC = str(get_cqm_config('scale_dep_frac', 'false')).lower() in Cobalt.Util.config_true_values
149

150
151
152
153
154
155
156
walltime_prediction = get_histm_config("walltime_prediction", "False").lower()   # *AdjEst*
walltime_prediction_configured = False
walltime_prediction_enabled = False
if walltime_prediction  == "true":
    walltime_prediction_configured = True
    walltime_prediction_enabled = True
#print "walltime_prediction_configured=", walltime_prediction_configured 
157

158
159
prediction_scheme = get_histm_config("prediction_scheme", "combined").lower()  # ["project", "user", "combined"]   # *AdjEst*

160
161
162
163
164
accounting_logdir = os.path.expandvars(get_cqm_config("log_dir", Cobalt.DEFAULT_LOG_DIRECTORY))
accounting_logger = logging.getLogger("cqm.accounting")
accounting_logger.addHandler(
    accounting.DatetimeFileHandler(os.path.join(accounting_logdir, "%Y%m%d")))

165

166
dbwriter = Cobalt.Logging.dbwriter(logging)
167
use_db_logging = get_cqm_config('use_db_logging','false')
Cheetah Goletz's avatar
Cheetah Goletz committed
168
if use_db_logging.lower() in Cobalt.Util.config_true_values:
169
    dbwriter.enabled = True
170
171
172
173
174
175
176
177
178
179
    overflow_filename = get_cqm_config('overflow_file', None)
    max_queued = int(get_cqm_config('max_queued_msgs', '-1'))
    if max_queued <= 0:
        max_queued = None
    if (overflow_filename == None) and (max_queued != None):
        logger.warning('No filename set for database logging messages, max_queued_msgs set to unlimited')
    if max_queued != None:
        dbwriter.overflow_filename = overflow_filename
        dbwriter.max_queued = max_queued

180
181
182
183
184
185
186
#writer so that cobalt log writes don't hang up scheduling.
cobalt_log_writer = disk_writer_thread()
cobalt_log_writer.daemon = True
cobalt_log_writer.start()

def cobalt_log_write(filename, msg):
    '''send the cobalt_log writer thread a filename, msg tuple.
187

188
    '''
189
    cobalt_log_writer.send((filename, msg))
190
191
192
193
194

def cobalt_log_terminate():
    '''Terminate the writer thread by sending it None.

    '''
195
    cobalt_log_writer.send(None)
196

197
198
199
200
201
202
203
def str_elapsed_time(elapsed_time):
    return "%d:%02d:%02d" % (elapsed_time / 3600, elapsed_time / 60 % 60, elapsed_time % 60)

def has_private_attr(obj, attr):
    assert attr[0:2] == "__"
    return hasattr(obj, "_" + obj.__class__.__name__ + attr)

204
205
206
207
208
def has_semi_private_attr(obj, attr):
    assert attr[0:1] == "_"
    return hasattr(obj, attr)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class Signal_Map (object):
    checkpoint = 'SIGUSR1'
    terminate = 'SIGTERM'
    force_kill = 'SIGKILL'

class Signal_Info (object):
    class Reason (object):
        delete = 'delete'
        preempt = 'preempt'
        time_limit = 'time limit'

    def __init__(self, reason = None, signame = None, user = None, pending = False):
        self.__reason = reason
        self.__signal = signame
        self.__user = user
        self.__pending = pending

    def __get_reason(self):
        return self.__reason

    def __set_reason(self, reason):
        if reason in ('delete', 'preempt', 'time_limit'):
            self.__reason = reason
        else:
            raise ValueError, "illegal signal reason: %s" % (reason,)
234

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
    reason = property(__get_reason, __set_reason)

    def __get_signal(self):
        return self.__signal

    def __set_signal(self, signame):
        if signame == None:
            self.__signal = None
        if not isinstance(signame, str):
            raise TypeError, "signal name must be a string"
        if signame[0:3] == "SIG" and hasattr(signal, signame):
            self.__signal = signame
        else:
            raise ValueError, "unknown signal name: %s" % (signame,)

    signal = property(__get_signal, __set_signal)

    def __get_user(self):
        return self.__user

    def __set_user(self, user):
        if isinstance(user, str) or user == None:
            self.__user = user
        else:
            raise TypeError, "user name must be a string or None"

    user = property(__get_user, __set_user)

    def __get_pending(self):
        return self.__pending

    def __set_pending(self, pending):
        if isinstance(pending, bool):
            self.__pending = pending
        else:
            raise TypeError, "pending flag must be a boolean value"

    pending = property(__get_pending, __set_pending)

274
275
276

#TODO: I want to get rid of this.  I don't want to see threaded-forks in 
# cobalt ever again. --PR
277
class RunScriptsThread(object):
278
    '''stub for restart compatibility'''
279
280
    def __init__(self):
        pass
281

282
def get_job_sm_states():
283

284
    return ['Ready',
285
        'Hold',
286
287
        'Job_Prologue',
        'Job_Prologue_Retry',
288
        'Job_Prologue_Retry_Release',
289
290
        'Resource_Prologue',
        'Resource_Prologue_Retry',
291
        'Resource_Prologue_Retry_Release',
292
293
294
295
296
297
298
299
300
301
302
303
        'Run_Retry',
        'Running',
        'Kill_Retry',
        'Killing',
        'Preempt_Retry',
        'Preempting',
        'Preempt_Finalize_Retry',
        'Preempt_Epilogue',
        'Preempted',
        'Preempted_Hold',
        'Finalize_Retry',
        'Resource_Epilogue',
304
        'Resource_Epilogue_Retry',
305
        'Job_Epilogue',
306
307
308
309
        'Job_Epilogue_Retry']

def get_job_sm_transitions():

310
    return [
311
        ('Ready', 'Hold'),                                  # user/admin hold
312
313
        ('Ready', 'Job_Prologue'),                          # run; start prologue scripts
        ('Ready', 'Job_Prologue_Retry'),                    # run; error contacting forker component; scripts not running, yet
314
315
316
        ('Ready', 'Terminal'),                              # kill
        ('Hold', 'Ready'),                                  # user/admin release, no holds
        ('Hold', 'Terminal'),                               # kill
317
318
        ('Job_Prologue', 'Resource_Prologue'),              # job_prologue scripts complete.  Starting Resource_prologue scripts
        ('Job_Prologue', 'Resource_Prologue_Retry'),        # error contacting forker component
319
        ('Job_Prologue', 'Job_Prologue_Retry'),             # Lost communication to forker during progress
320
321
322
        ('Job_Prologue', 'Job_Epilogue'),                   # kill / job prologue failed
        ('Job_Prologue', 'Job_Epilogue_Retry'),             # kill / job prologue failed.  Error communicating with forker
        ('Job_Prologue', 'Job_Prologue_Retry_Release'),     # kill / job prologue failed.  error releasing system resources
323
324
        ('Job_Prologue_Retry', 'Job_Prologue'),             # forker starting job prologue scripts
        ('Job_Prologue_Retry', 'Terminal'),                 # kill; error contacting forker component
325
326
        ('Job_Prologue_Retry_Release', 'Job_Epilogue'),     # resource successfully released
        ('Job_Prologue_Retry_Release', 'Job_Epilogue_Retry'),     # error contacting forker component
327
328
        ('Resource_Prologue', 'Run_Retry'),                 # prologue scripts complete; error contacting system component
        ('Resource_Prologue', 'Running'),                   # prologue scripts complete; system component starting task
329
        ('Resource_Prologue', 'Resource_Prologue_Retry'),   # Lost communication to forker during progress
330
331
332
        ('Resource_Prologue', 'Resource_Epilogue'),         # kill / resource prologue failed; system component released resource
        ('Resource_Prologue', 'Resource_Epilogue_Retry'),   # kill / resource prologue failed; error contacting forker component
        ('Resource_Prologue', 'Resource_Prologue_Retry_Release'), # kill / job_prologue failed.  error releasing system resources
333
334
        ('Resource_Prologue_Retry','Resource_Prologue'),    # run resource prologue scripts
        ('Resource_Prologue_Retry','Job_Epilogue'),         # kill; run any required job cleanup
335
336
        ('Resource_Prologue_Retry_Release', 'Resource_Epilogue'),       # resource successfully released
        ('Resource_Prologue_Retry_Release', 'Resource_Epilogue_Retry'), # error contacting forker component
337
338
        ('Run_Retry', 'Running'),                           # system component starting task
        ('Run_Retry', 'Resource_Epilogue'),                 # kill
339
        ('Run_Retry', 'Resource_Epilogue_Retry'),           # kill; error contacting forker component
340
341
342
343
344
345
346
        ('Running', 'Kill_Retry'),                          # kill; error contacting system component
        ('Running', 'Killing'),                             # kill; system component signaling task
        ('Running', 'Preempt_Retry'),                       # preempt; error contacting system component
        ('Running', 'Preempting'),                          # preempt; system component signaling task
        ('Running', 'Finalize_Retry'),                      # task execution complete; error finalizing task and obtaining exit
                                                            #     status
        ('Running', 'Resource_Epilogue'),                   # task execution complete; task finalized and exit status obtained
347
        ('Running', 'Resource_Epilogue_Retry'),             # task executioncomplete; error contacting forker component 
348
349
350
351
        ('Kill_Retry', 'Kill_Retry'),                       # handle multiple task signaling failures
        ('Kill_Retry', 'Killing'),                          # system component signaling task
        ('Kill_Retry', 'Finalize_Retry'),                   # task execution complete/terminated; task finalization failed
        ('Kill_Retry', 'Resource_Epilogue'),                # task execution completed/terminated successfully
352
353
        ('Kill_Retry', 'Resource_Epilogue_Retry'),          # task execution completed/terminated successfully; error contacting
                                                            #     forker component
354
355
356
357
        ('Killing', 'Kill_Retry'),                          # new signal, or kill timer expired; error contacting system component
        ('Killing', 'Killing'),                             # kill timer expired; escalating to a forced kill
        ('Killing', 'Finalize_Retry'),                      # task execution complete/terminated; task finalization failed
        ('Killing', 'Resource_Epilogue'),                   # task execution complete/terminated; task finalization successful
358
359
        ('Killing', 'Resource_Epilogue_Retry'),             # task execution complete/terminated; task finalization successful;
                                                            #   error contacting forker component
360
361
362
363
364
365
366
367
        ('Preempt_Retry', 'Kill_Retry'),                    # kill; error contacting system component
        ('Preempt_Retry', 'Killing'),                       # kill; system component signaling task
        ('Preempt_Retry', 'Preempt_Retry'),                 # hasndle multiple task signaling failures
        ('Preempt_Retry', 'Preempting'),                    # system component signaling task
        ('Preempt_Retry', 'Preempt_Finalize_Retry'),        # task execution terminated, task finalization failed
        ('Preempt_Retry', 'Preempt_Epilogue'),              # task execution terminated successfully
        ('Preempt_Retry', 'Finalize_Retry'),                # task execution completed, task finalization failed
        ('Preempt_Retry', 'Resource_Epilogue'),             # task execution completed successfully
368
        ('Preempt_Retry', 'Resource_Epilogue_Retry'),       # task execution completed; error contacting forker component
369
370
371
372
373
374
375
376
377
378
379
        ('Preempting', 'Kill_Retry'),                       # kill; new signal, error contacting system component
        ('Preempting', 'Killing'),                          # kill; new signal and system component signaling task,
                                                            #     same signal used for preempt, or attempted signal demotion
        ('Preempting', 'Preempting'),                       # preemption timer expired, escalating to the next signal level
        ('Preempting', 'Preempt_Retry'),                    # preemption timer expired, error contacting system component
        ('Preempting', 'Preempt_Finalize_Retry'),           # task execution terminated, task finalization failed
        ('Preempting', 'Preempt_Epilogue'),                 # task execution complete/terminated; task finalization successful
        ('Preempt_Finalize_Retry', 'Preempt_Epilogue'),     # task finalization failed successful
        ('Preempt_Epilogue', 'Preempted'),                  # task execution complete/terminated, no holds pending
        ('Preempt_Epilogue', 'Preempted_Hold'),             # task execution complete/terminated, holds pending
        ('Preempt_Epilogue', 'Job_Epilogue'),               # task execution complete/terminated, kill pending
380
        ('Preempted', 'Resource_Prologue'),                 # run
381
382
383
384
385
        ('Preempted', 'Preempted_Hold'),                    # user/admin hold
        ('Preempted', 'Job_Epilogue'),                      # kill
        ('Preempted_Hold', 'Preempted'),                    # user/admin release, no holds
        ('Preempted_Hold', 'Job_Epilogue'),                 # kill
        ('Finalize_Retry', 'Resource_Epilogue'),            # task finalized and exit status obtained (if applicable)
386
        ('Finalize_Retry', 'Resource_Epilogue_Retry'),      # task finalized, exit status obtained, error contacting forker component
387
        ('Resource_Epilogue', 'Job_Epilogue'),              # resource epilogue scripts complete
388
389
        ('Resource_Epilogue', 'Resource_Epilogue_Retry'),   # Lost communication to forker during progress
        ('Resource_Epilogue', 'Job_Epilogue_Retry'),        # resource epilogue scripts complete; error contacting forker component
390
        ('Resource_Epilogue_Retry', 'Resource_Epilogue'),   # starting resource epilogue scripts
391
        ('Job_Epilogue', 'Terminal'),                       # job epilogue scripts complete
392
        ('Job_Epilogue_Retry', 'Job_Epilogue'),             # starting job_epilogue scripts
393
        ('Job_Epilogue', 'Job_Epilogue_Retry')              # Lost communication to forker during progress
394
        ]
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422

def get_job_sm_initial_state():

    return "Ready"

def get_job_sm_events():

    return ['Run', 'Hold', 'Release', 'Preempt', 'Kill', 'Task_End']

def get_job_sm_seas(job):

    return {('Ready', 'Run') : [job._sm_ready__run],
            ('Ready', 'Hold') : [job._sm_ready__hold],
            ('Ready', 'Release') : [job._sm_ready__release],
            ('Ready', 'Kill') : [job._sm_ready__kill],
            ('Hold', 'Hold') : [job._sm_hold__hold],
            ('Hold', 'Release') : [job._sm_hold__release],
            ('Hold', 'Kill') : [job._sm_hold__kill],
            ('Job_Prologue', 'Progress') : [job._sm_job_prologue__progress],
            ('Job_Prologue', 'Hold') : [job._sm_common__pending_hold], 
            ('Job_Prologue', 'Release') : [job._sm_common__pending_release],
            ('Job_Prologue', 'Preempt') : [job._sm_common__pending_preempt], #custom?
            ('Job_Prologue', 'Kill') : [job._sm_common__pending_kill],
            ('Job_Prologue_Retry', 'Progress') : [job._sm_job_prologue_retry__progress],
            ('Job_Prologue_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Job_Prologue_Retry', 'Release') : [job._sm_common__pending_release],
            ('Job_Prologue_Retry', 'Preempt') : [job._sm_common__pending_preempt], #Must go pending
            ('Job_Prologue_Retry', 'Kill') : [job._sm_job_prologue_retry__kill],
423
424
425
426
427
            ('Job_Prologue_Retry_Release', 'Progress') : [job._sm_job_prologue_retry_release__progress],
            ('Job_Prologue_Retry_Release', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Prologue_Retry_Release', 'Release') : [job._sm_exit_common__release],
            ('Job_Prologue_Retry_Release', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Prologue_Retry_Release', 'Kill') : [job._sm_exit_common__kill],
428
429
430
431
432
            ('Resource_Prologue', 'Progress') : [job._sm_resource_prologue__progress],
            ('Resource_Prologue', 'Hold') : [job._sm_common__pending_hold],
            ('Resource_Prologue', 'Release') : [job._sm_common__pending_release],
            ('Resource_Prologue', 'Preempt') : [job._sm_common__pending_preempt], #custom?
            ('Resource_Prologue', 'Kill') : [job._sm_common__pending_kill],
433
            ('Resource_Prologue_Retry', 'Progress') : [job._sm_resource_prologue_retry__progress],
434
435
436
            ('Resource_Prologue_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Resource_Prologue_Retry', 'Release') : [job._sm_common__pending_release],
            ('Resource_Prologue_Retry', 'Preempt') : [job._sm_common__pending_preempt], #custom: go directly to preempted?
437
438
439
440
441
442
            ('Resource_Prologue_Retry', 'Kill') : [job._sm_resource_prologue_retry__kill],
            ('Resource_Prologue_Retry_Release', 'Progress') : [job._sm_resource_prologue_retry_release__progress],
            ('Resource_Prologue_Retry_Release', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Prologue_Retry_Release', 'Release') : [job._sm_exit_common__release],
            ('Resource_Prologue_Retry_Release', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Prologue_Retry_Release', 'Kill') : [job._sm_exit_common__kill],
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
            ('Run_Retry', 'Progress') : [job._sm_run_retry__progress],
            ('Run_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Run_Retry', 'Release') : [job._sm_common__pending_release],
            ('Run_Retry', 'Preempt') : [job._sm_common__pending_preempt],
            ('Run_Retry', 'Kill') : [job._sm_run_retry__kill],
            ('Running', 'Progress') : [job._sm_running__progress],
            ('Running', 'Hold') : [job._sm_common__pending_hold],
            ('Running', 'Release') : [job._sm_common__pending_release],
            ('Running', 'Preempt') : [job._sm_common__pending_preempt],
            ('Running', 'Kill') : [job._sm_running__kill],
            ('Running', 'Task_End') : [job._sm_running__task_end],
            ('Kill_Retry', 'Progress') : [job._sm_kill_retry__progress],
            ('Kill_Retry', 'Hold') : [job._sm_kill_common__hold],
            ('Kill_Retry', 'Release') : [job._sm_kill_common__release],
            ('Kill_Retry', 'Preempt') : [job._sm_kill_common__preempt],
            ('Kill_Retry', 'Kill') : [job._sm_kill_retry__kill],
            ('Kill_Retry', 'Task_End') : [job._sm_kill_retry__task_end],
            ('Killing', 'Progress') : [job._sm_killing__progress],
            ('Killing', 'Hold') : [job._sm_kill_common__hold],
            ('Killing', 'Release') : [job._sm_kill_common__release],
            ('Killing', 'Preempt') : [job._sm_kill_common__preempt],
            ('Killing', 'Kill') : [job._sm_killing__kill],
            ('Killing', 'Task_End') : [job._sm_killing__task_end],
            ('Preempt_Retry', 'Progress') : [job._sm_preempt_retry__progress],
            ('Preempt_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Retry', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Retry', 'Kill') : [job._sm_preempt_retry__kill],
            ('Preempt_Retry', 'Task_End') : [job._sm_preempt_retry__task_end],
            ('Preempting', 'Progress') : [job._sm_preempting__progress],
            ('Preempting', 'Hold') : [job._sm_common__pending_hold],
            ('Preempting', 'Release') : [job._sm_common__pending_release],
            ('Preempting', 'Kill') : [job._sm_preempting__kill],
            ('Preempting', 'Task_End') : [job._sm_preempting__task_end],
            ('Preempt_Finalize_Retry', 'Progress') : [job._sm_preempt_finalize_retry__progress],
            ('Preempt_Finalize_Retry', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Finalize_Retry', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Finalize_Retry', 'Kill') : [job._sm_common__pending_kill],
            ('Preempt_Epilogue', 'Progress') : [job._sm_preempt_epilogue__progress],
            ('Preempt_Epilogue', 'Hold') : [job._sm_common__pending_hold],
            ('Preempt_Epilogue', 'Release') : [job._sm_common__pending_release],
            ('Preempt_Epilogue', 'Kill') : [job._sm_common__pending_kill],
            ('Preempted', 'Run') : [job._sm_preempted__run],
            ('Preempted', 'Hold') : [job._sm_preempted__hold],
            ('Preempted', 'Release') : [job._sm_preempted__release],
            ('Preempted', 'Kill') : [job._sm_preempted__kill],
            ('Preempted_Hold', 'Hold') : [job._sm_preempted_hold__hold],
            ('Preempted_Hold', 'Release') : [job._sm_preempted_hold__release],
            ('Preempted_Hold', 'Kill') : [job._sm_preempted_hold__kill],
            ('Finalize_Retry', 'Progress') : [job._sm_finalize_retry__progress],
            ('Finalize_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Finalize_Retry', 'Release') : [job._sm_exit_common__release],
            ('Finalize_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Finalize_Retry', 'Kill') : [job._sm_exit_common__kill],
            ('Resource_Epilogue', 'Progress') : [job._sm_resource_epilogue__progress],
            ('Resource_Epilogue', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Epilogue', 'Release') : [job._sm_exit_common__release],
            ('Resource_Epilogue', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Epilogue', 'Kill') : [job._sm_exit_common__kill],
            ('Resource_Epilogue_Retry', 'Progress') : [job._sm_resource_epilogue_retry__progress],
            ('Resource_Epilogue_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Resource_Epilogue_Retry', 'Release') : [job._sm_exit_common__release],
            ('Resource_Epilogue_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Resource_Epilogue_Retry', 'Kill') : [job._sm_exit_common__kill],
            ('Job_Epilogue', 'Progress') : [job._sm_job_epilogue__progress],
            ('Job_Epilogue', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Epilogue', 'Release') : [job._sm_exit_common__release],
            ('Job_Epilogue', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Epilogue', 'Kill') : [job._sm_exit_common__kill],
            ('Job_Epilogue_Retry', 'Progress') : [job._sm_job_epilogue_retry__progress],
            ('Job_Epilogue_Retry', 'Hold') : [job._sm_exit_common__hold],
            ('Job_Epilogue_Retry', 'Release') : [job._sm_exit_common__release],
            ('Job_Epilogue_Retry', 'Preempt') : [job._sm_exit_common__preempt],
            ('Job_Epilogue_Retry', 'Kill') : [job._sm_exit_common__kill],
            }

class Job (StateMachine):
    """
    The job tracks a job, driving it at a high-level.  Actual operations on the job, such as execution or termination, are
    the responsibility of the system component
    """
523

524
    acctlog = Cobalt.Util.AccountingLog('qm')
525

526
527
528
529
530
531
532
    # properties for easier accounting logging
    ctime = property(lambda self: self.__timers['queue'].start_times[0])
    qtime = property(lambda self:
        self.__timers['current_queue'].start_times[0])
    start = property(lambda self: self.__timers['user'].start_times[-1])
    exec_host = property(lambda self: ":".join(self.__locations[-1]))
    end = property(lambda self: self.__timers['user'].stop_times[-1])
533

534
535
536
537
538
539
    fields = Data.fields + [
        "jobid", "jobname", "state", "attribute", "location", "starttime", 
        "submittime", "endtime", "queue", "type", "user",
        "walltime", "procs", "nodes", "mode", "cwd", "command", "args", 
        "outputdir", "project", "lienID", "stagein", "stageout",
        "reservation", "host", "port", "url", "stageid", "envs", "inputfile", 
540
        "kernel", "kerneloptions", "ion_kernel", "ion_kerneloptions", "admin_hold",
541
        "user_hold", "dependencies", "notify", "adminemail", "outputpath",
542
        "errorpath", "cobalt_log_file", "path", "preemptable", "preempts",
543
544
545
        "mintasktime", "maxtasktime", "maxcptime", "force_kill_delay", 
        "is_runnable", "is_active",
        "has_completed", "sm_state", "score", "attrs", "has_resources", 
546
547
        "exit_status", "dep_frac", "walltime_p", "user_list", "runid",
        "geometry"
548
549
550
    ]

    _states = get_job_sm_states() + StateMachine._states
551

552
    _transitions = get_job_sm_transitions()
553

554
555
    _initial_state = get_job_sm_initial_state()
    _events = get_job_sm_events() + StateMachine._events
556
557
558
559
560
561
562
563

    # return codes to improve typo detection.  by using the return codes in condition statements, a typo will result in a key
    # error rather than an incorrectly followed path.
    __rc_success = "success"
    __rc_retry = "retry"
    __rc_pg_create = "pg_create"
    __rc_xmlrpc = "xmlrpc"
    __rc_unknown = "unknown"
564

565
    def __init__(self, spec):
566
        self.initializing = True
567
        seas = get_job_sm_seas(self)
568

569
        # StateMachine.__init__(self, spec, seas = seas, terminal_actions = [(self._sm_terminal, {})])
570
        StateMachine.__init__(self, spec, seas = seas)
571

George Rojas's avatar
George Rojas committed
572
573
        logger.info(str(spec))

574
        self.jobid = spec.get("jobid")
575
        self.umask = spec.get("umask")
576
577
578
579
580
581
582
        self.jobname = spec.get("jobname", "N/A")
        self.attribute = spec.get("attribute", "compute")
        self.starttime = spec.get("starttime", "-1")
        self.submittime = spec.get("submittime", time.time())
        self.endtime = spec.get("endtime", "-1")
        self.type = spec.get("type", "mpish")
        self.user = spec.get("user")
583
        self.__walltime = int(float(spec.get("walltime", 0)))
584
        self.walltime_p = spec.get("walltime_p", self.walltime)    #  *AdjEst*
585
586
587
588
        self.procs = spec.get("procs")
        self.nodes = spec.get("nodes")
        self.cwd = spec.get("cwd")
        self.command = spec.get("command")
589
        self.args = spec.get("args", [])
590
591
        self.project = spec.get("project")
        self.lienID = spec.get("lienID")
592
593
        self.stagein = spec.get("stagein") #does nothing
        self.stageout = spec.get("stageout") #like ze goggles, it does nothing
594
        self.reservation = spec.get("reservation", False) #appears to be defunct.
595
596
597
598
599
        self.host = spec.get("host")
        self.port = spec.get("port")
        self.url = spec.get("url")
        self.stageid = spec.get("stageid")
        self.inputfile = spec.get("inputfile")
600
        self.tag = spec.get("tag", "job")
601
        self.kernel = spec.get("kernel", get_config_option('bgsystem', 'cn_default_kernel', 'default'))
602
        self.kerneloptions = spec.get("kerneloptions")
Paul Rich's avatar
Paul Rich committed
603
604
        self.ion_kernel = spec.get("ion_kernel",  get_config_option('bgsystem', 'ion_default_kernel', 'default'))
        self.ion_kerneloptions = spec.get("ion_kerneloptions")
605
606
607
        self.notify = spec.get("notify")
        self.adminemail = spec.get("adminemail")
        self.location = spec.get("location")
608
        self.__locations = []
609
        self.outputpath = spec.get("outputpath")
George Rojas's avatar
George Rojas committed
610
        if self.outputpath and self.jobname == 'N/A':
611
612
613
614
615
            jname = self.outputpath.split('/')[-1].split('.output')[0]
            if jname and jname != str(self.jobid):
                self.jobname = jname
        self.outputdir = spec.get("outputdir")
        self.errorpath = spec.get("errorpath")
616
        self.cobalt_log_file = spec.get("cobalt_log_file")
617
        if not self.cobalt_log_file:
618
            self.cobalt_log_file = "%s/%s.cobaltlog" % (self.outputdir, self.jobid)
619
620
621
        else:
            t = string.Template(self.cobalt_log_file)
            self.cobalt_log_file = t.safe_substitute(jobid=self.jobid)
622
623
        self.path = spec.get("path")
        self.mode = spec.get("mode", "co")
624
        self.envs = spec.get("envs", {})
Paul Rich's avatar
Paul Rich committed
625
        self.force_kill_delay = spec.get("force_kill_delay",
626
                get_cqm_config('force_kill_delay', DEFAULT_FORCE_KILL_DELAY))
627
        self.attrs = spec.get("attrs", {})
628

629
        self.score = float(spec.get("score", 0.0))
630
631
632
633
634
635
636
637

        self.__resource_nodects = []
        self.__timers = dict(
            queue = Timer(),
            current_queue = Timer(),
            user = Timer(),
        )

638
        # setting the queue will cause updated accounting records to be
639
        #written and the current queue timer to be restarted, so
640
641
        # this needs to be done only after the object has been initialized
        self.queue = spec.get("queue", "default")
642
        self.resid = None #Must be obtained from the scheduler component.
643
644
645
        self.__timers['queue'].start()
        self.etime = time.time()

646
647
648
649
        self.__admin_hold = False
        self.__user_hold = False
        self.__dep_hold = False

650
651
        # setting the hold flags will automatically cause the appropriate hold
        #events to be triggered, so this needs to be done
652
653
654
655
656
        # only after the object has been completely initialized
        if spec.get("admin_hold", False):
            self.admin_hold = True
        if spec.get("user_hold", False):
            self.user_hold = True
657

658
        self.dep_fail = False
Paul Rich's avatar
Paul Rich committed
659
        self.dep_frac = None #float(get_cqm_config('dep_frac', 0.5))
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
        self.all_dependencies = spec.get("all_dependencies")
        self.satisfied_dependencies = []
        if self.all_dependencies:
            self.all_dependencies = str(self.all_dependencies).split(":")
            logger.info("Job %s/%s: dependencies set to %s", self.jobid, self.user, ":".join(self.all_dependencies))
            # read passed dependencies and set dep_hold if necessary
            # set dep_hold as needed:
            self.update_dep_state()
        else:
            self.all_dependencies = []


        self.preemptable = spec.get("preemptable", False)
        self.__preempts = 0
        if self.preemptable:
            self.mintasktime = int(float(spec.get("mintasktime", 0)))
            self.maxtasktime = int(float(spec.get("maxtasktime", 0)))
            self.maxcptime = int(float(spec.get("maxcptime", 0)))

        self.geometry = spec.get("geometry", None)

        self.taskid = None
        self.task_running = False
        self.exit_status = None
        self.max_running = False

        self.total_etime = 0.0
        self.priority_core_hours = None
688
        self.user_list = spec.get('user_list', [self.user])
689

690
691
692
693
694
695
        #for imporved script handling:
        self.job_prescript_ids = []
        self.job_postscript_ids = []
        self.resource_prescript_ids = []
        self.resource_postscript_ids = []

696
        self.runid = None #A job starts once.
697
698
699
700
        #prebooting is assumed for script jobs, unless the user requests that it be turned off
        #This is here for BG/Q ensemble job support.  --PMR
        self.script_preboot = spec.get("script_preboot", True) #preboot a block for a script.

701

702
        dbwriter.log_to_db(self.user, "creating", "job_data", JobDataMsg(self))
703
        if self.admin_hold:
704
            dbwriter.log_to_db(self.user, "admin_hold", "job_prog", JobProgMsg(self))
705
        if self.user_hold:
706
707
            dbwriter.log_to_db(self.user, "user_hold", "job_prog", JobProgMsg(self))

708
        self.current_task_start = time.time()
709

710
        self.initializing = False
711

712
713
    # end def __init__()

714
    def no_holds_left(self):
Paul Rich's avatar
Paul Rich committed
715
        '''Check for whether any holds are set on a job'''
716
717
        return not (self.admin_hold or 
                self.user_hold or 
718
                self.dep_hold or 
719
720
                self.max_running)

721
722
723
    def __getstate__(self):
        data = {}
        for key, value in self.__dict__.iteritems():
724
            if key not in ['log', 'comms', 'acctlog']:
725
726
727
728
                data[key] = value
        return data

    def __setstate__(self, state):
729
        self.__dict__.update(state)
730

731
732
733
        #reset the statemachine's states
        self.update_seas(get_job_sm_seas(self))

734
735
736
737
        # BRT: why is the current queue timer being reset?  if cqm is 
        #restarted, the job remained in the queue during that time, so I would
        #think that timer should continue to run during the restart rather 
        #than been reset.
738
739
740
        if not self.__timers.has_key('current_queue'):
            self.__timers['current_queue'] = Timer()
            self.__timers['current_queue'].start()
741
742
743

        # special case to handle missing data from old state files
        if not state.has_key("total_etime"):
Daniel Buettner's avatar
Daniel Buettner committed
744
            logger.info("old job missing total_etime")
745
            self.total_etime = 0.0
746

747
        if not state.has_key("priority_core_hours"):
Daniel Buettner's avatar
Daniel Buettner committed
748
            logger.info("old job missing priority_core_hours")
749
            self.priority_core_hours = None
750

751
752
753
        if not state.has_key("dep_fail"):
            logger.info("old job missing dep_fail")
            self.dep_fail = False
754

755
756
757
        if not state.has_key("dep_frac"):
            logger.info("old job missing dep_frac")
            self.dep_frac = None
758
759

        if not state.has_key("user_list"):
760
            logger.info("old job missing user_list")
761
            self.user_list = [self.user]
762

763
764
        if not state.has_key("walltime_p"):
            logger.info("old job missing walltime_p")
765
766
767
768
769
            self.walltime_p = self.walltime

        if not state.has_key("resid"):
            logger.info("old job missing resid")
            self.resid = None
770

771
772
773
774
775
776
777
778
        if not state.has_key("job_prologue_ids"):
            self.job_prologue_ids = None
        if not state.has_key("job_epilogue_ids"):
            self.job_epilogue_ids = None
        if not state.has_key("resource_prologue_ids"):
            self.resource_prologue_ids = None
        if not state.has_key("resource_epilogue_ids"):
            self.resource_epilogue_ids = None
779
780
        if not state.has_key("geometry"):
            self.geometry = None
781
782
        if not state.has_key("script_preboot"):
            self.script_preboot = True
783
        if not state.has_key('ion_kernel'):
784
            #if ion kernel doesn't exist, neither will ion_kerneloptions
785
            self.ion_kernel = get_config_option('bgsystem', 'ion_default_kernel', 'default')
786
787
788
            self.ion_kerneloptions = get_config_option('bgsystem', 'ion_default_kernel_options', False)
            if self.ion_kerneloptions == False:
                self.ion_kerneloptions = None
789
        self.runid = state.get("runid", None)
790
        self.current_task_start = state.get("current_task_start", None)
791
792
        #for old statefiles, make sure to update the dependency state on restart:
        self.__dep_hold = False
793
794
        self.initializing = False

795
796
    def __task_signal(self, retry = True):
        '''send a signal to the managed task'''
797
798
        # BRT: this routine should probably check if the task could not be 
        #signaled because it was no longer running
799
        try:
800
            self._sm_log_info("instructing the system component to send signal %s" % (self.__signaling_info.signal,))
801
802
803
            pgroup = ComponentProxy("system").signal_process_groups([{'id':self.taskid}], self.__signaling_info.signal)
        except (ComponentLookupError, xmlrpclib.Fault), e:
            #
804
805
            # BRT: will a ComponentLookupError ever be raised directly or will 
            # it always be buried in a XML-RPC fault?
806
            #
807
808
809
810
            # BRT: shouldn't we be checking the XML-RPC fault code?  which 
            # fault codes are valid for this operation?  at the very least 
            # unexpected fault code should be reported as such and the retry 
            # loop broken.
811
812
            #
            if retry:
813
                self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
814
                return Job.__rc_retry
815
            else:
816
817
                self._sm_log_warn("failed to communicate with the system component (%s); manual cleanup may be required" % \
                    (e,))
818
819
                return Job.__rc_xmlrpc
        except:
820
            self._sm_raise_exception("unexpected error from the system component; manual cleanup may be required")
821
822
823
824
825
826
827
            return Job.__rc_unknown

        self.__signaled_info = self.__signaling_info
        del self.__signaling_info
        return Job.__rc_success

    def __task_run(self):
828
        global run_id_gen
829
        walltime = self.walltime
830
        if self.runid is None: #allow us to unset this for preemption
831
            self.runid = run_id_gen.next() #Don't try and run this twice.
832

833
834
835
        if self.preemptable and self.maxtasktime < walltime:
            walltime = self.maxtasktime
        try:
836
            self._sm_log_info("instructing the system component to begin executing the task")
837
            pgroup = ComponentProxy("system", retry=False).add_process_groups([{
838
839
840
841
842
843
844
845
846
847
                'id':"*",
                'jobid':self.jobid,
                'user':self.user,
                'stdin':self.inputfile,
                'stdout':self.outputpath,
                'stderr':self.errorpath,
                'cobalt_log_file':self.cobalt_log_file,
                'size':self.procs,
                'nodect':"*",
                'mode':self.mode,
848
                'script_preboot':self.script_preboot,
849
850
851
852
853
854
855
856
                'cwd':self.outputdir,
                'executable':self.command,
                'args':self.args,
                'env':self.envs,
                'location':self.location,
                'umask':self.umask,
                'kernel':self.kernel,
                'kerneloptions':self.kerneloptions,
857
858
                'ion_kernel':self.ion_kernel,
                'ion_kerneloptions':self.ion_kerneloptions,
859
                'starttime':self.starttime,
860
                'walltime':walltime,
861
                'killtime':self.force_kill_delay + 1,
862
                'resid': self.resid,
863
864
                'runid': self.runid,
                'attrs': self.attrs,
865
866
                'queue': self.queue,
                'project': self.project
867
868
869
870
871
872
873
874
            }])
            if pgroup[0].has_key('id'):
                self.taskid = pgroup[0]['id']
                if pgroup[0].has_key('nodect') and pgroup[0]['nodect'] != None:
                    self.__resource_nodects.append(pgroup[0]['nodect'])
                else:
                    self.__resource_nodects.append(self.nodes)
            else:
875
876
                self._sm_log_error("process group creation failed", 
                        cobalt_log = True)
877
878
                return Job.__rc_pg_create
        except (ComponentLookupError, xmlrpclib.Fault), e:
879
            self._sm_log_warn("failed to execute the task (%s); retry pending" % (e,))
880
881
            return Job.__rc_retry
        except:
882
            self._sm_raise_exception("unexpected error returned from the system component when attempting to add task",
883
                cobalt_log = True)
884
            return Job.__rc_unknown
Paul Rich's avatar
Paul Rich committed
885
        else:
886
887
888
889
890
891
            # Start task timer.  This corresponds to the compute time
            if self.walltime > 0:
                self.__max_job_timer = Timer(self.walltime * 60)
            else:
                self.__max_job_timer = Timer()
            self.__max_job_timer.start()
892
893
            self.current_task_start = time.time()
            task_start = accounting.task_start(self.jobid, self.runid, self.current_task_start, self.location)
894
895
            accounting_logger.info(task_start)
            logger.info(task_start)
896
        return Job.__rc_success
897

898
899
    def __task_finalize(self):
        '''get exit code from system component'''
Paul Rich's avatar
Paul Rich committed
900
901
        def end_time_and_log():
            self.__max_job_timer.stop()
902
903
904
            task_end = accounting.task_end(self.jobid, self.runid, self.__max_job_timer.elapsed_times[-1], self.current_task_start,
                    time.time(), self.location)
            self.current_task_start = None
Paul Rich's avatar
Paul Rich committed
905
906
            accounting_logger.info(task_end)
            logger.info(task_end)
907
908
909
910
        try:
            result = ComponentProxy("system").wait_process_groups([{'id':self.taskid, 'exit_status':'*'}])
            if result:
                self.exit_status = result[0].get('exit_status')
Paul Rich's avatar
Paul Rich committed
911
                dbwriter.log_to_db(None, "exit_status_update", "job_prog",
912
                    JobProgExitStatusMsg(self))
913
            else:
914
                self._sm_log_warn("system component was unable to locate the task; exit status not obtained")
915
        except (ComponentLookupError, xmlrpclib.Fault), e:
916
            self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
917
918
            return Job.__rc_retry
        except:
Paul Rich's avatar
Paul Rich committed
919
920
921
            # We aren't going into a retry and anything that doesn't return a retry "ends" the task and progresses towards
            # preemption/the job terminal action.  We need the log here.
            end_time_and_log()
922
            self._sm_raise_exception("unexpected error returned from the system component while finalizing task")
923
            return Job.__rc_unknown
Paul Rich's avatar
Paul Rich committed
924
        else:
Paul Rich's avatar
Paul Rich committed
925
            end_time_and_log()
926
927
        self.taskid = None
        return Job.__rc_success
928

929
    def __release_resources(self):
930
        '''release computing resources that may still be reserved by the job'''
931
        try:
932
            ComponentProxy("system").reserve_resources_until(self.location, None, self.jobid)
933
934
            return Job.__rc_success
        except (ComponentLookupError, xmlrpclib.Fault), e:
935
            self._sm_log_warn("failed to communicate with the system component (%s); retry pending" % (e,))
936
937
            return Job.__rc_retry
        except:
938
            self._sm_raise_exception("unexpected error returned from the system component while releasing resources")
939
            return Job.__rc_unknown
940

941
    def _sm_log_debug(self, msg, cobalt_log = False):
942
        '''write an informational message to the CQM log that includes state machine status'''
943
944
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
945
946
        else:
            event_msg = ""
947
        logger.debug("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
948
949
950
        if cobalt_log:
            self.__write_cobalt_log("Debug: %s" % (msg,))

951
    def _sm_log_info(self, msg, cobalt_log = False):
952
        '''write an informational message to the CQM log that includes state machine status'''
953
954
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
955
956
        else:
            event_msg = ""
957
        logger.info("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
958
959
960
        if cobalt_log:
            self.__write_cobalt_log("Info: %s" % (msg,))

961
    def _sm_log_warn(self, msg, cobalt_log = False):
962
        '''write a warning message to the CQM log that includes state machine status'''
963
964
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
965
966
        else:
            event_msg = ""
967
        logger.warning("Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg))
968
969
970
        if cobalt_log:
            self.__write_cobalt_log("Warning: %s" % (msg,))

971
972
    def _sm_log_error(self, msg, tb_flag = True, skip_tb_entries = 1, cobalt_log = False):
        '''write an error message to the CQM log that includes state machine status and a stack trace'''
973
974
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
975
976
        else:
            event_msg = ""
977
        full_msg = "Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg)
978
979
980
981
982
983
984
985
986
        if tb_flag:
            stack = traceback.format_stack()
            last_tb_entry = len(stack) - skip_tb_entries
            for entry in stack[:last_tb_entry]:
                full_msg += "\n    " + entry[:-1]
        logger.error(full_msg)
        if cobalt_log:
            self.__write_cobalt_log("ERROR: %s" % (msg,))

987
    def _sm_log_exception(self, msg, cobalt_log = False):
988
        '''write an error message to the CQM log that includes state machine status and a stack trace'''
989
990
        if self._sm_event != None:
            event_msg = "; Event=%s" % (self._sm_event,)
991
992
        else:
            event_msg = ""
993
        full_msg = "Job %s/%s: State=%s%s; %s" % (self.jobid, self.user, self._sm_state, event_msg, msg)
994
995
996
997
998
999
1000
1001
1002
1003
        (exc_cls, exc, tb) = sys.exc_info()
        exc_str = traceback.format_exception_only(exc_cls, exc)[0]
        full_msg += "\n    Exception: %s" % (exc_str)
        stack = traceback.format_tb(tb)
        for entry in stack:
            full_msg += "\n    " + entry[:-1]
        logger.error(full_msg)
        if cobalt_log:
            self.__write_cobalt_log("EXCEPTION: %s" % (full_msg,))

1004
1005
    def _sm_raise_exception(self, msg, cobalt_log = False):
        self._sm_log_error(msg, skip_tb_entries = 2, cobalt_log = cobalt_log)
1006
        raise JobProcessingError(msg, self.jobid, self.user, self.state, self._sm_state, self._sm_event)
1007

1008
    def _sm_log_user_delete(self, signame, user = None, pending = False):
1009
1010
1011
1012
1013
1014
1015
1016
        if user != None:
            umsg = " by user %s" % (user,)
        else:
            umsg = ""
        if pending == True:
            pmsg = " now pending"
        else:
            pmsg = ""
1017
        self._sm_log_info("user delete requested with signal %s%s%s" % (signame, umsg, pmsg), cobalt_log = True)
1018

1019
1020
    def _sm_signaling_info_set_user_delete(self, signame = Signal_Map.terminate, user = None, pending = False):
        self.__signaling_info = Signal_Info(Signal_Info.Reason.delete, signame, user, pending)
1021
        self._sm_log_user_delete(signame, user, pending)
1022

1023
    def _sm_check_job_timers(self):
1024
1025
        if self.__max_job_timer.has_expired:
            # if the job execution time has exceeded the wallclock time, then inform the task that it must terminate
George Rojas's avatar
George Rojas committed
1026
            self._sm_log_info("maximum execution time exceeded; initiating job termination", cobalt_log = True)
1027
            accounting_logger.info(accounting.abort(self.jobid))
1028
            return Signal_Info(Signal_Info.Reason.time_limit, Signal_Map.terminate)
1029
1030
1031
        else:
            return None

1032
    def _sm_kill_task(self):
1033
1034
        '''initiate the user deletion of a task by signaling it and then changing to the appropriate state'''
        # BRT: this routine should probably check if the task could not be signaled because it was no longer running
1035
1036
        rc = self.__task_signal()
        if rc == Job.__rc_success:
1037
            # start the signal timer so that the state machine knows when to escalate to sending a force kill signal
1038
            if self.__signaled_info.signal != Signal_Map.force_kill: