jobreader.py 2.26 KB
Newer Older
Michael Salim's avatar
Michael Salim committed
1
from collections import defaultdict
2
from django.conf import settings
3 4
from balsam.service import models
BalsamJob = models.BalsamJob
Michael Salim's avatar
Michael Salim committed
5

6
import logging
7
import uuid
8 9
logger = logging.getLogger(__name__)

Michael Salim's avatar
Michael Salim committed
10
class JobReader():
Michael Salim's avatar
Michael Salim committed
11 12 13 14 15 16 17 18 19 20 21
    '''Interface with BalsamJob DB & pull relevant jobs'''
    @staticmethod
    def from_config(config):
        '''Constructor'''
        if config.job_file: return FileJobReader(config.job_file)
        else: return WFJobReader(config.wf_name)
    
    @property
    def by_states(self):
        '''dict of jobs keyed by state'''
        result = defaultdict(list)
Michael Salim's avatar
Michael Salim committed
22
        for job in self.jobs:
Michael Salim's avatar
Michael Salim committed
23 24 25 26 27 28 29
            result[job.state].append(job)
        return result
    
    def refresh_from_db(self):
        '''caller invokes this to read from DB'''
        jobs = self._get_jobs()
        jobs = self._filter(jobs)
Michael Salim's avatar
Michael Salim committed
30
        self.jobs = jobs
Michael Salim's avatar
Michael Salim committed
31 32 33 34
   
    def _get_jobs(self): raise NotImplementedError
    
    def _filter(self, job_queryset):
35
        jobs = job_queryset.exclude(state__in=models.END_STATES)
Michael Salim's avatar
Michael Salim committed
36
        jobs = jobs.filter(allowed_work_sites__icontains=settings.BALSAM_SITE)
Michael Salim's avatar
Michael Salim committed
37
        return jobs
Michael Salim's avatar
Michael Salim committed
38 39 40

    
class FileJobReader(JobReader):
Michael Salim's avatar
Michael Salim committed
41
    '''Limit to job PKs specified in a file. Used by metascheduler.'''
Michael Salim's avatar
Michael Salim committed
42
    def __init__(self, job_file):
Michael Salim's avatar
Michael Salim committed
43
        self.jobs = []
Michael Salim's avatar
Michael Salim committed
44 45
        self.job_file = job_file
        self.pk_list = None
46
        logger.info(f"Taking jobs from file {self.job_file}")
Michael Salim's avatar
Michael Salim committed
47 48 49 50 51 52 53 54 55 56

    def _get_jobs(self):
        if self.pk_list is None:
            pk_strings = open(self.job_file).read().split()
            self.pk_list = [uuid.UUID(pk) for pk in pk_strings]
        jobs = BalsamJob.objects.filter(job_id__in=self.pk_list)
        return jobs


class WFJobReader(JobReader):
Michael Salim's avatar
Michael Salim committed
57 58
    '''Consume all jobs from DB, optionally matching a Workflow name.
    Will not consume jobs scheduled by metascheduler'''
Michael Salim's avatar
Michael Salim committed
59
    def __init__(self, wf_name):
Michael Salim's avatar
Michael Salim committed
60
        self.jobs = []
Michael Salim's avatar
Michael Salim committed
61
        self.wf_name = wf_name
62 63 64 65
        if wf_name: 
            logger.info(f"Consuming jobs from workflow {wf_name}")
        else:
            logger.info("Consuming all jobs from local DB")
Michael Salim's avatar
Michael Salim committed
66 67 68 69 70
    
    def _get_jobs(self):
        objects = BalsamJob.objects
        wf = self.wf_name
        jobs = objects.filter(workflow=wf) if wf else objects.all()
Michael Salim's avatar
Michael Salim committed
71
        jobs = jobs.filter(scheduler_id__exact='')
Michael Salim's avatar
Michael Salim committed
72
        return jobs