Commit 75d8ac3b authored by Michael Salim's avatar Michael Salim

Using PriorityQueue for transitions: prioritize pre-run transitions to keep runners busy

parent 50f4727f
......@@ -42,11 +42,8 @@ You can find many settings to change. There are Django specific settings in `arg
To create and initialize the default sqlite3 database without password protections do:
```
./manage.py makemigrations argo
./manage.py makemigrations balsam
./manage.py migrate
./manage -h
```
......@@ -2,6 +2,7 @@
from collections import namedtuple
import glob
import multiprocessing
import multiprocessing.managers
import queue
import os
from io import StringIO
......@@ -49,7 +50,7 @@ POSTPROCESS_TIMEOUT_SECONDS = 300
SITE = settings.BALSAM_SITE
StatusMsg = namedtuple('StatusMsg', ['pk', 'state', 'msg'])
JobMsg = namedtuple('JobMsg', ['job', 'transition_function'])
JobMsg = namedtuple('JobMsg', ['priority', 'jobid'])
def on_exit():
logger.debug("TransitionProc caught SIGTERM: do nothing and wait for end")
......@@ -62,17 +63,21 @@ def main(job_queue, status_queue, lock):
while True:
logger.debug("Transition process waiting for job")
job_msg = job_queue.get()
job, transition_function = job_msg
priority, jobid = job_msg
if job == 'end':
if jobid == 'end':
logger.debug("Received end..quitting transition loop")
return
else:
job = BalsamJob.objects.get(pk=jobid)
transition_function = TRANSITIONS[job.state]
if job.work_site != SITE:
job.work_site = SITE
lock.acquire()
job.save(update_fields=['work_site'])
lock.release()
logger.debug(f"Received job {job.cute_id}: {transition_function}")
logger.debug(f"Received job {job.cute_id}: {transition_function} (priority {priority})")
try:
transition_function(job, lock)
except BalsamTransitionError as e:
......@@ -102,7 +107,17 @@ class TransitionProcessPool:
def __init__(self):
self.job_queue = multiprocessing.Queue()
# Use a priority queue to ensure pre-run transitions occur fast
handler = lambda a,b: on_exit()
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
class PQueueManager(multiprocessing.managers.SyncManager): pass
PQueueManager.register("PriorityQueue", queue.PriorityQueue)
self.pqueue_manager = PQueueManager()
self.pqueue_manager.start()
self.job_queue = self.pqueue_manager.PriorityQueue()
self.status_queue = multiprocessing.Queue()
self.lock = LockClass()
self.transitions_pk_list = []
......@@ -124,8 +139,9 @@ class TransitionProcessPool:
def add_job(self, job):
if job in self: raise BalsamTransitionError("already in transition")
if job.state not in TRANSITIONS: raise TransitionNotFoundError
transition_function = TRANSITIONS[job.state]
m = JobMsg(job, transition_function)
priority = PRIORITIES[job.state]
m = JobMsg(priority, job.pk)
self.job_queue.put(m)
self.transitions_pk_list.append(job.pk)
......@@ -139,6 +155,7 @@ class TransitionProcessPool:
break
def flush_job_queue(self):
return
while not self.job_queue.empty():
try:
self.job_queue.get_nowait()
......@@ -147,12 +164,14 @@ class TransitionProcessPool:
logger.debug("Flushed transition process job queue")
def end_and_wait(self):
m = JobMsg('end', None)
priority = PRIORITIES['end']
m = JobMsg(priority, 'end')
logger.debug("Sending end message and waiting on transition processes")
for proc in self.procs:
self.job_queue.put(m)
for proc in self.procs:
proc.join()
self.pqueue_manager.shutdown()
logger.info("All Transition processes joined: done.")
self.transitions_pk_list = []
......@@ -434,3 +453,12 @@ TRANSITIONS = {
'RUN_ERROR': handle_run_error,
'POSTPROCESSED': stage_out,
}
PRIORITIES = {
'end' : -1,
'READY': 0,
'STAGED_IN': 0,
'RUN_DONE': 1,
'RUN_TIMEOUT': 1,
'RUN_ERROR': 1,
'POSTPROCESSED': 1,
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment