Commit a1643dae authored by Paul Rich's avatar Paul Rich
Browse files

Merge branch '19-singleton-queue' into 'master'

maxtotaljobs limit added.

This adds the limiter for maximum jobs overall running in queue.  Useful
for profiling machines with noisy network environments.  This also adds
output to cqadm for this information, and an entry in the cqadm manpage.

See merge request !10
parents 47062f4a 26963d8b
......@@ -133,6 +133,9 @@ Send an email at the start and stop of every job run through the specified queue
.B maxrunning=x
The maximum number of jobs a user is allowed to have running in the queue.
.TP
.B maxtotaljobs=x
The maximum number of jobs a queue is allowed to run at once.
.TP
.B maxusernodes=x
The maximum number of nodes a user is allowed to have allocated with running jobs in the queue.
.TP
......
......@@ -156,12 +156,12 @@ def getq(info):
if que['mintime'] is not None:
que['mintime'] = "%02d:%02d:00" % (divmod(int(que.get('mintime')), 60))
header = [('Queue', 'Users', 'Groups', 'MinTime', 'MaxTime', 'MaxRunning',
'MaxQueued', 'MaxUserNodes', 'MaxNodeHours', 'TotalNodes',
'AdminEmail', 'State', 'Cron', 'Policy', 'Priority')]
'MaxTotalJobs', 'MaxQueued', 'MaxUserNodes', 'MaxNodeHours',
'TotalNodes', 'AdminEmail', 'State', 'Cron', 'Policy', 'Priority')]
datatoprint = [(que['name'], que['users'], que['groups'],
que['mintime'], que['maxtime'],
que['maxrunning'], que['maxqueued'],
que['maxusernodes'], que['maxnodehours'],
que['maxrunning'], que['maxtotaljobs'], que['maxqueued'],
que['maxusernodes'], que['maxnodehours'],
que['totalnodes'],
que['adminemail'], que['state'],
que['cron'], que['policy'], que['priority'])
......@@ -192,7 +192,7 @@ def process_cqadm_options(jobs, parser, spec, user):
force = parser.options.force # force flag.
info = [{'tag':'queue', 'name':'*', 'state':'*', 'users':'*', 'groups':'*', 'maxtime':'*', 'mintime':'*', 'maxuserjobs':'*',
'maxusernodes':'*', 'maxqueued':'*', 'maxrunning':'*', 'maxnodehours':'*', 'adminemail':'*',
'maxusernodes':'*', 'maxqueued':'*', 'maxrunning':'*','maxtotaljobs':'*', 'maxnodehours':'*', 'adminemail':'*',
'totalnodes':'*', 'cron':'*', 'policy':'*', 'priority':'*'}]
response = []
......
......@@ -3199,7 +3199,8 @@ class Restriction (Data):
__checks__ = {'maxtime':'maxwalltime', 'users':'usercheck', 'groups':'groupcheck',
'maxrunning':'maxuserjobs', 'mintime':'minwalltime',
'maxqueued':'maxqueuedjobs', 'maxusernodes':'maxusernodes',
'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours' }
'totalnodes':'maxtotalnodes', 'maxnodehours':'maxnodehours',
'maxtotaljobs':'limittotaljobs'}
def __init__(self, spec, queue=None):
'''info could be like
......@@ -3211,7 +3212,7 @@ class Restriction (Data):
'''
Data.__init__(self, spec)
self.name = spec.get("name")
if self.name in ['maxrunning', 'maxusernodes', 'totalnodes']:
if self.name in ['maxrunning', 'maxusernodes', 'totalnodes', 'maxtotaljobs']:
self.type = 'run'
else:
self.type = spec.get("type", "queue")
......@@ -3274,6 +3275,15 @@ class Restriction (Data):
else:
return (True, "")
def limittotaljobs(self, job, queuestate=None):
'''limits how many jobs this queue can run at once. Initially intended
for a "singleton queue". Most of this handling is in update_max_running'''
userjobs = [j for j in queuestate if j.has_resources and j.queue == job['queue']]
if len(userjobs) >= int(self.value):
return (False, "Max total running jobs limit reached")
else:
return (True, "")
def maxqueuedjobs(self, job, _=None):
'''limits how many jobs a user can have in the queue at a time'''
userjobs = [j for j in self.queue.jobs if j.user == job['user']]
......@@ -3411,7 +3421,9 @@ class Queue (Data):
'''In order to keep the max_running property of jobs up to date, this function needs
to be called when a job starts running, or a new job appears in a queue.'''
if not self.restrictions.has_key("maxrunning"):
if not (self.restrictions.has_key("maxrunning") or
self.restrictions.has_key("maxtotaljobs")):
# if it *was* there and was removed, we better clean up
for job in self.jobs:
if job.max_running:
......@@ -3421,29 +3433,36 @@ class Queue (Data):
if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
job.max_running = False
return
unum = dict()
for job in self.jobs.q_get([{'has_resources':True}]):
if job.user not in unum:
unum[job.user] = 1
else:
unum[job.user] = unum[job.user] + 1
for job in self.jobs:
old = job.max_running
job.max_running = False
if unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value):
if not job.has_resources:
job.max_running = True
if old != job.max_running:
logger.info("Job %s/%s: max_running set to %s", job.jobid, job.user, job.max_running)
if job.max_running:
dbwriter.log_to_db(None, "maxrun_hold", "job_prog", JobProgMsg(job))
else:
unum = dict()
for job in self.jobs.q_get([{'has_resources':True}]):
if job.user not in unum:
unum[job.user] = 1
else:
dbwriter.log_to_db(None, "maxrun_hold_release", "job_prog", JobProgMsg(job))
if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
unum[job.user] = unum[job.user] + 1
total_jobs = 0
for val in unum.values():
total_jobs += val
for job in self.jobs:
old = job.max_running
job.max_running = False
if (self.restrictions.has_key("maxrunning") and
unum.get(job.user, 0) >= int(self.restrictions["maxrunning"].value)):
if not job.has_resources:
job.max_running = True
if (self.restrictions.has_key("maxtotaljobs") and
total_jobs >= int(self.restrictions["maxtotaljobs"].value)):
if not job.has_resources:
job.max_running = True
if old != job.max_running:
logger.info("Job %s/%s: max_running set to %s", job.jobid, job.user, job.max_running)
if job.max_running:
dbwriter.log_to_db(None, "maxrun_hold", "job_prog", JobProgMsg(job))
else:
dbwriter.log_to_db(None, "maxrun_hold_release", "job_prog", JobProgMsg(job))
if job.no_holds_left():
dbwriter.log_to_db(None, "all_holds_clear", "job_prog", JobProgMsg(job))
class QueueDict(DataDict):
item_cls = Queue
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment