Commit 1d882513 authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch 'cray-merge-cqm' into 'develop'

CQM changes for Cray Port

Queue manger changes.  There is a new queue restriction and a fix for what turns out to be a pretty major display bug due to a missing comma.

See merge request !9
parents c01ace65 84bac5c0
......@@ -2818,6 +2818,7 @@ class Job (StateMachine):
self.trigger_event('Release', {'type' : 'dep'})
def __get_job_state(self):
'''Translate the statemachine state into qstat-friendly states.'''
if self._sm_state in ('Ready', 'Preempted'):
if self.max_running:
return "maxrun_hold"
......@@ -2832,7 +2833,7 @@ class Job (StateMachine):
return "dep_hold"
return "admin_hold"
if self._sm_state in ['Job_Prologue','Job_Prologue_Retry',
if self._sm_state in ['Job_Prologue', 'Job_Prologue_Retry',
'Resource_Prologue', 'Resource_Prologue_Retry', 'Run_Retry']:
return "starting"
if self._sm_state == 'Running':
......@@ -2844,8 +2845,9 @@ class Job (StateMachine):
return 'preempting'
if self._sm_state == 'Preempted':
return 'preempted'
if self._sm_state in ['Job_Prologue_Retry_Release', 'Resource_Prologue_Retry_Release', 'Finalize_Retry',
'Resource_Epilogue','Resource_Epilogue_Retry', 'Job_Epilogue',
if self._sm_state in ['Job_Prologue_Retry_Release',
'Resource_Prologue_Retry_Release', 'Finalize_Retry',
'Resource_Epilogue', 'Resource_Epilogue_Retry', 'Job_Epilogue',
return "exiting"
if self._sm_state == 'Terminal':
......@@ -2874,9 +2876,10 @@ class Job (StateMachine):
if self._sm_state in ['Preempt_Retry', 'Preempting',
'Preempt_Finalize_Retry', 'Preempt_Epilogue', 'Preempted']:
return 'P'
if self._sm_state in ['Job_Prologue_Retry_Release', 'Resource_Prologue_Retry_Release', 'Finalize_Retry',
if self._sm_state in ['Job_Prologue_Retry_Release',
'Resource_Prologue_Retry_Release', 'Finalize_Retry',
'Resource_Epilogue', 'Resource_Epilogue_Retry', 'Job_Epilogue',
'Job_Epilogue_Retry' 'Terminal']:
'Job_Epilogue_Retry', 'Terminal']:
return 'E'
raise DataStateError, "unknown state: %s" % (self._sm_state,)
......@@ -3199,7 +3202,8 @@ class Restriction (Data):
__checks__ = {'maxtime':'maxwalltime', 'users':'usercheck', 'groups':'groupcheck',
'maxrunning':'maxuserjobs', 'mintime':'minwalltime',
'maxqueued':'maxqueuedjobs', 'maxusernodes':'maxusernodes',
'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours' }
'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours',
def __init__(self, spec, queue=None):
'''info could be like
......@@ -3211,7 +3215,7 @@ class Restriction (Data):
Data.__init__(self, spec) = spec.get("name")
if in ['maxrunning', 'maxusernodes', 'totalnodes']:
if in ['maxrunning', 'maxusernodes', 'totalnodes', 'maxtotaljobs']:
self.type = 'run'
self.type = spec.get("type", "queue")
......@@ -3274,6 +3278,15 @@ class Restriction (Data):
return (True, "")
def limittotaljobs(self, job, queuestate=None):
'''limits how many jobs this queue can run at once. Initially intended
for a "singleton queue". Most of this handling is in update_max_running'''
userjobs = [j for j in queuestate if j.has_resources and j.queue == job['queue']]
if len(userjobs) >= int(self.value):
return (False, "Max total running jobs limit reached")
return (True, "")
def maxqueuedjobs(self, job, _=None):
'''limits how many jobs a user can have in the queue at a time'''
userjobs = [j for j in if j.user == job['user']]
......@@ -3411,7 +3424,9 @@ class Queue (Data):
'''In order to keep the max_running property of jobs up to date, this function needs
to be called when a job starts running, or a new job appears in a queue.'''
if not self.restrictions.has_key("maxrunning"):
if not (self.restrictions.has_key("maxrunning") or
# if it *was* there and was removed, we better clean up
for job in
if job.max_running:
......@@ -3421,19 +3436,26 @@ class Queue (Data):
if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
job.max_running = False
unum = dict()
for job in[{'has_resources':True}]):
if job.user not in unum:
unum[job.user] = 1
unum[job.user] = unum[job.user] + 1
total_jobs = 0
for val in unum.values():
total_jobs += val
for job in
old = job.max_running
job.max_running = False
if unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value):
if (self.restrictions.has_key("maxrunning") and
unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value)):
if not job.has_resources:
job.max_running = True
if (self.restrictions.has_key("maxtotaljobs") and
total_jobs >= int(self.restrictions["maxtotaljobs"].value)):
if not job.has_resources:
job.max_running = True
if old != job.max_running:
......@@ -3685,7 +3707,7 @@ class QueueManager(Component):
if job.dep_frac is None:
job.dep_frac = float(get_cqm_config('dep_frac', 0.5))
new_score = min(float(waiting_job.nodes) / job.nodes, 1.) * job.dep_frac * job.score
new_score = min(float(waiting_job.nodes) / float(job.nodes), 1.) * job.dep_frac * job.score
new_score = (float(get_cqm_config('dep_frac', 0.5)) * job.score)
dbwriter.log_to_db(None, "dep_frac_update", "job_prog", JobProgDepFracMsg(job))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment