Commit 84bac5c0 authored by Paul Rich's avatar Paul Rich
Browse files

CQM changes for Cray Port

parent cb0ce41e
...@@ -2818,6 +2818,7 @@ class Job (StateMachine): ...@@ -2818,6 +2818,7 @@ class Job (StateMachine):
self.trigger_event('Release', {'type' : 'dep'}) self.trigger_event('Release', {'type' : 'dep'})
def __get_job_state(self): def __get_job_state(self):
'''Translate the statemachine state into qstat-friendly states.'''
if self._sm_state in ('Ready', 'Preempted'): if self._sm_state in ('Ready', 'Preempted'):
if self.max_running: if self.max_running:
return "maxrun_hold" return "maxrun_hold"
...@@ -2832,20 +2833,21 @@ class Job (StateMachine): ...@@ -2832,20 +2833,21 @@ class Job (StateMachine):
else: else:
return "dep_hold" return "dep_hold"
return "admin_hold" return "admin_hold"
if self._sm_state in ['Job_Prologue','Job_Prologue_Retry', if self._sm_state in ['Job_Prologue', 'Job_Prologue_Retry',
'Resource_Prologue', 'Resource_Prologue_Retry', 'Run_Retry']: 'Resource_Prologue', 'Resource_Prologue_Retry', 'Run_Retry']:
return "starting" return "starting"
if self._sm_state == 'Running': if self._sm_state == 'Running':
return "running" return "running"
if self._sm_state in ['Kill_Retry', 'Killing']: if self._sm_state in ['Kill_Retry', 'Killing']:
return "killing" return "killing"
if self._sm_state in ['Preempt_Retry', 'Preempting', if self._sm_state in ['Preempt_Retry', 'Preempting',
'Preempt_Finalize_Retry', 'Preempt_Epilogue']: 'Preempt_Finalize_Retry', 'Preempt_Epilogue']:
return 'preempting' return 'preempting'
if self._sm_state == 'Preempted': if self._sm_state == 'Preempted':
return 'preempted' return 'preempted'
if self._sm_state in ['Job_Prologue_Retry_Release', 'Resource_Prologue_Retry_Release', 'Finalize_Retry', if self._sm_state in ['Job_Prologue_Retry_Release',
'Resource_Epilogue','Resource_Epilogue_Retry', 'Job_Epilogue', 'Resource_Prologue_Retry_Release', 'Finalize_Retry',
'Resource_Epilogue', 'Resource_Epilogue_Retry', 'Job_Epilogue',
'Job_Epilogue_Retry']: 'Job_Epilogue_Retry']:
return "exiting" return "exiting"
if self._sm_state == 'Terminal': if self._sm_state == 'Terminal':
...@@ -2871,12 +2873,13 @@ class Job (StateMachine): ...@@ -2871,12 +2873,13 @@ class Job (StateMachine):
return "R" return "R"
if self._sm_state in ['Kill_Retry', 'Killing']: if self._sm_state in ['Kill_Retry', 'Killing']:
return "K" return "K"
if self._sm_state in ['Preempt_Retry', 'Preempting', if self._sm_state in ['Preempt_Retry', 'Preempting',
'Preempt_Finalize_Retry', 'Preempt_Epilogue', 'Preempted']: 'Preempt_Finalize_Retry', 'Preempt_Epilogue', 'Preempted']:
return 'P' return 'P'
if self._sm_state in ['Job_Prologue_Retry_Release', 'Resource_Prologue_Retry_Release', 'Finalize_Retry', if self._sm_state in ['Job_Prologue_Retry_Release',
'Resource_Prologue_Retry_Release', 'Finalize_Retry',
'Resource_Epilogue', 'Resource_Epilogue_Retry', 'Job_Epilogue', 'Resource_Epilogue', 'Resource_Epilogue_Retry', 'Job_Epilogue',
'Job_Epilogue_Retry' 'Terminal']: 'Job_Epilogue_Retry', 'Terminal']:
return 'E' return 'E'
raise DataStateError, "unknown state: %s" % (self._sm_state,) raise DataStateError, "unknown state: %s" % (self._sm_state,)
...@@ -3199,7 +3202,8 @@ class Restriction (Data): ...@@ -3199,7 +3202,8 @@ class Restriction (Data):
__checks__ = {'maxtime':'maxwalltime', 'users':'usercheck', 'groups':'groupcheck', __checks__ = {'maxtime':'maxwalltime', 'users':'usercheck', 'groups':'groupcheck',
'maxrunning':'maxuserjobs', 'mintime':'minwalltime', 'maxrunning':'maxuserjobs', 'mintime':'minwalltime',
'maxqueued':'maxqueuedjobs', 'maxusernodes':'maxusernodes', 'maxqueued':'maxqueuedjobs', 'maxusernodes':'maxusernodes',
'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours' } 'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours',
'maxtotaljobs':'limittotaljobs'}
def __init__(self, spec, queue=None): def __init__(self, spec, queue=None):
'''info could be like '''info could be like
...@@ -3211,7 +3215,7 @@ class Restriction (Data): ...@@ -3211,7 +3215,7 @@ class Restriction (Data):
''' '''
Data.__init__(self, spec) Data.__init__(self, spec)
self.name = spec.get("name") self.name = spec.get("name")
if self.name in ['maxrunning', 'maxusernodes', 'totalnodes']: if self.name in ['maxrunning', 'maxusernodes', 'totalnodes', 'maxtotaljobs']:
self.type = 'run' self.type = 'run'
else: else:
self.type = spec.get("type", "queue") self.type = spec.get("type", "queue")
...@@ -3274,6 +3278,15 @@ class Restriction (Data): ...@@ -3274,6 +3278,15 @@ class Restriction (Data):
else: else:
return (True, "") return (True, "")
def limittotaljobs(self, job, queuestate=None):
'''limits how many jobs this queue can run at once. Initially intended
for a "singleton queue". Most of this handling is in update_max_running'''
userjobs = [j for j in queuestate if j.has_resources and j.queue == job['queue']]
if len(userjobs) >= int(self.value):
return (False, "Max total running jobs limit reached")
else:
return (True, "")
def maxqueuedjobs(self, job, _=None): def maxqueuedjobs(self, job, _=None):
'''limits how many jobs a user can have in the queue at a time''' '''limits how many jobs a user can have in the queue at a time'''
userjobs = [j for j in self.queue.jobs if j.user == job['user']] userjobs = [j for j in self.queue.jobs if j.user == job['user']]
...@@ -3411,7 +3424,9 @@ class Queue (Data): ...@@ -3411,7 +3424,9 @@ class Queue (Data):
'''In order to keep the max_running property of jobs up to date, this function needs '''In order to keep the max_running property of jobs up to date, this function needs
to be called when a job starts running, or a new job appears in a queue.''' to be called when a job starts running, or a new job appears in a queue.'''
if not self.restrictions.has_key("maxrunning"):
if not (self.restrictions.has_key("maxrunning") or
self.restrictions.has_key("maxtotaljobs")):
# if it *was* there and was removed, we better clean up # if it *was* there and was removed, we better clean up
for job in self.jobs: for job in self.jobs:
if job.max_running: if job.max_running:
...@@ -3421,29 +3436,36 @@ class Queue (Data): ...@@ -3421,29 +3436,36 @@ class Queue (Data):
if job.no_holds_left(): if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job)) dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
job.max_running = False job.max_running = False
else:
return unum = dict()
unum = dict() for job in self.jobs.q_get([{'has_resources':True}]):
for job in self.jobs.q_get([{'has_resources':True}]): if job.user not in unum:
if job.user not in unum: unum[job.user] = 1
unum[job.user] = 1
else:
unum[job.user] = unum[job.user] + 1
for job in self.jobs:
old = job.max_running
job.max_running = False
if unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value):
if not job.has_resources:
job.max_running = True
if old != job.max_running:
logger.info("Job %s/%s: max_running set to %s", job.jobid, job.user, job.max_running)
if job.max_running:
dbwriter.log_to_db(None, "maxrun_hold", "job_prog", JobProgMsg(job))
else: else:
dbwriter.log_to_db(None, "maxrun_hold_release", "job_prog", JobProgMsg(job)) unum[job.user] = unum[job.user] + 1
if job.no_holds_left(): total_jobs = 0
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job)) for val in unum.values():
total_jobs += val
for job in self.jobs:
old = job.max_running
job.max_running = False
if (self.restrictions.has_key("maxrunning") and
unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value)):
if not job.has_resources:
job.max_running = True
if (self.restrictions.has_key("maxtotaljobs") and
total_jobs >= int(self.restrictions["maxtotaljobs"].value)):
if not job.has_resources:
job.max_running = True
if old != job.max_running:
logger.info("Job %s/%s: max_running set to %s", job.jobid, job.user, job.max_running)
if job.max_running:
dbwriter.log_to_db(None, "maxrun_hold", "job_prog", JobProgMsg(job))
else:
dbwriter.log_to_db(None, "maxrun_hold_release", "job_prog", JobProgMsg(job))
if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
class QueueDict(DataDict): class QueueDict(DataDict):
item_cls = Queue item_cls = Queue
...@@ -3685,7 +3707,7 @@ class QueueManager(Component): ...@@ -3685,7 +3707,7 @@ class QueueManager(Component):
if job.dep_frac is None: if job.dep_frac is None:
job.dep_frac = float(get_cqm_config('dep_frac', 0.5)) job.dep_frac = float(get_cqm_config('dep_frac', 0.5))
if CQM_SCALE_DEP_FRAC: if CQM_SCALE_DEP_FRAC:
new_score = min(float(waiting_job.nodes) / job.nodes, 1.) * job.dep_frac * job.score new_score = min(float(waiting_job.nodes) / float(job.nodes), 1.) * job.dep_frac * job.score
else: else:
new_score = (float(get_cqm_config('dep_frac', 0.5)) * job.score) new_score = (float(get_cqm_config('dep_frac', 0.5)) * job.score)
dbwriter.log_to_db(None, "dep_frac_update", "job_prog", JobProgDepFracMsg(job)) dbwriter.log_to_db(None, "dep_frac_update", "job_prog", JobProgDepFracMsg(job))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment