Commit 126de615 authored by Michael Salim's avatar Michael Salim
Browse files

Shortened "balsam ls" application printing. Added functionality to parse times from state history

parent ad390fe6
import os import os
import json import json
import logging import logging
import re
import sys import sys
from datetime import datetime from datetime import datetime
import uuid import uuid
import time
from django.core.exceptions import ValidationError,ObjectDoesNotExist from django.core.exceptions import ValidationError,ObjectDoesNotExist
from django.conf import settings from django.conf import settings
...@@ -69,6 +69,18 @@ JOB_FINISHED ...@@ -69,6 +69,18 @@ JOB_FINISHED
FAILED FAILED
USER_KILLED USER_KILLED
PARENT_KILLED'''.split() PARENT_KILLED'''.split()
STATE_TIME_PATTERN = re.compile(r'''
^ # start of line
\[ # opening square bracket
(\d+-\d+-\d\d\d\d # date MM-DD-YYYY
\s+ # one or more space
\d+:\d+:\d+) # time HH:MM:SS
\s+ # one or more space
(\w+) # state
\s* # 0 or more space
\] # closing square bracket
''', re.VERBOSE | re.MULTILINE)
def assert_disjoint(): def assert_disjoint():
groups = [ACTIVE_STATES, PROCESSABLE_STATES, RUNNABLE_STATES, END_STATES] groups = [ACTIVE_STATES, PROCESSABLE_STATES, RUNNABLE_STATES, END_STATES]
...@@ -334,6 +346,10 @@ auto timeout retry: {self.auto_timeout_retry} ...@@ -334,6 +346,10 @@ auto timeout retry: {self.auto_timeout_retry}
return f"[{self.name} | { str(self.pk)[:8] }]" return f"[{self.name} | { str(self.pk)[:8] }]"
else: else:
return f"[{ str(self.pk)[:8] }]" return f"[{ str(self.pk)[:8] }]"
@staticmethod
def short_exe(exe):
return " ".join(os.path.basename(p) for p in exe.split())
@property @property
def app_cmd(self): def app_cmd(self):
...@@ -455,7 +471,8 @@ auto timeout retry: {self.auto_timeout_retry} ...@@ -455,7 +471,8 @@ auto timeout retry: {self.auto_timeout_retry}
def get_line_string(self): def get_line_string(self):
recent_state = self.get_recent_state_str() recent_state = self.get_recent_state_str()
app = self.application if self.application else self.direct_command app = self.application if self.application else self.direct_command
return f' {str(self.pk):36} | {self.name:26} | {self.workflow:26} | {app:26} | {recent_state}' app = self.short_exe(app)
return f' {str(self.pk):36} | {self.name:20} | {self.workflow:20} | {app:36} | {recent_state}'
def runtime_str(self): def runtime_str(self):
minutes, seconds = divmod(self.runtime_seconds, 60) minutes, seconds = divmod(self.runtime_seconds, 60)
...@@ -466,7 +483,13 @@ auto timeout retry: {self.auto_timeout_retry} ...@@ -466,7 +483,13 @@ auto timeout retry: {self.auto_timeout_retry}
@staticmethod @staticmethod
def get_header(): def get_header():
return f' {"job_id":36} | {"name":26} | {"workflow":26} | {"application":26} | {"latest update"}' return f' {"job_id":36} | {"name":20} | {"workflow":20} | {"application":36} | {"latest update"}'
def get_state_times(self):
matches = STATE_TIME_PATTERN.findall(self.state_history)
return {state: datetime.strptime(timestr, TIME_FMT)
for timestr, state in matches
}
def create_working_path(self): def create_working_path(self):
top = settings.BALSAM_WORK_DIRECTORY top = settings.BALSAM_WORK_DIRECTORY
...@@ -556,16 +579,18 @@ Envs: {self.environ_vars} ...@@ -556,16 +579,18 @@ Envs: {self.environ_vars}
'''.strip() + '\n' '''.strip() + '\n'
def get_line_string(self): def get_line_string(self):
format = ' %20s | %20s | %20s | %20s | %s ' format = ' %20s | %30s | %20s | %20s | %s '
output = format % (self.name, self.executable, output = format % (self.name,
self.default_preprocess, self.short_exe(self.executable),
self.default_postprocess, self.short_exe(self.default_preprocess),
self.description) self.short_exe(self.default_postprocess),
self.description
)
return output return output
@staticmethod @staticmethod
def get_header(): def get_header():
format = ' %20s | %20s | %20s | %20s | %s ' format = ' %20s | %30s | %20s | %20s | %s '
output = format % ('name', 'executable', output = format % ('name', 'executable',
'preprocess', 'postprocess', 'preprocess', 'postprocess',
'description') 'description')
...@@ -574,3 +599,7 @@ Envs: {self.environ_vars} ...@@ -574,3 +599,7 @@ Envs: {self.environ_vars}
@property @property
def cute_id(self): def cute_id(self):
return f"[{self.name} | { str(self.pk)[:8] }]" return f"[{self.name} | { str(self.pk)[:8] }]"
@staticmethod
def short_exe(exe):
return " ".join(os.path.basename(p) for p in exe.split())
import datetime
import os import os
import pprint
import re
import sys import sys
from importlib.util import find_spec from importlib.util import find_spec
from balsam.service.models import BalsamJob from balsam.service.models import BalsamJob, TIME_FMT
from tests.BalsamTestCase import BalsamTestCase from tests.BalsamTestCase import BalsamTestCase
from tests.BalsamTestCase import create_job, create_app from tests.BalsamTestCase import create_job, create_app
from tests import util from tests import util
def state_hist_pattern(state):
return re.compile(f'''
^ # start of line
\[ # opening square bracket
(\d+-\d+-\d\d\d\d # date MM-DD-YYYY
\s+ # one or more space
\d+:\d+:\d+) # time HH:MM:SS
\s+ # one or more space
{state} # state
\s* # 0 or more space
\] # closing square bracket
''',
re.VERBOSE | re.MULTILINE
)
p_ready = state_hist_pattern('READY')
p_pre = state_hist_pattern('PREPROCESSED')
p_rundone = state_hist_pattern('RUN_DONE')
p_finished = state_hist_pattern('JOB_FINISHED')
class TestNoOp(BalsamTestCase): class TestNoOp(BalsamTestCase):
...@@ -21,11 +43,13 @@ class TestNoOp(BalsamTestCase): ...@@ -21,11 +43,13 @@ class TestNoOp(BalsamTestCase):
if num_nodes[-1] != max_workers: if num_nodes[-1] != max_workers:
num_nodes.append(max_workers) num_nodes.append(max_workers)
rpn = [64] #rpn = [64]
jpn = [64, 256, 1024] #jpn = [64, 256, 1024]
rpn = [16]
jpn = [64, 128]
self.experiments = itertools.product(num_nodes, rpn, jpn) self.experiments = itertools.product(num_nodes, rpn, jpn)
def serial_expt(self, num_nodes, rpn, jpn): def create_serial_expt(self, num_nodes, rpn, jpn):
BalsamJob.objects.all().delete() BalsamJob.objects.all().delete()
num_jobs = num_nodes * jpn num_jobs = num_nodes * jpn
...@@ -35,6 +59,28 @@ class TestNoOp(BalsamTestCase): ...@@ -35,6 +59,28 @@ class TestNoOp(BalsamTestCase):
BalsamJob.objects.bulk_create(jobs) BalsamJob.objects.bulk_create(jobs)
self.assertEqual(BalsamJob.objects.count(), num_jobs) self.assertEqual(BalsamJob.objects.count(), num_jobs)
def process_job_times(self):
state_data = BalsamJob.objects.values_list('state_history', flat=True)
ready_times = (p_ready.search(jobhist).group(1) for jobhist in statedata)
ready_times = [datetime.strptime(time_str, TIME_FMT) for time_str in ready_times]
time0 = min(ready_times)
ready_times = [(t - time0).seconds for t in ready_times]
print("Ready Times")
pprint(ready_times)
finished_times = (p_finished.search(jobhist).group(1) for jobhist in statedata)
finished_times = [datetime.strptime(time_str, TIME_FMT) for time_str in finished_times]
finished_times = [(t - time0).seconds for t in finished_times]
print("Finished Times")
pprint(finished_times)
def test_serial(self): def test_serial(self):
done_query = BalsamJob.objects.filter(state='JOB_FINISHED')
for (num_nodes, rpn, jpn) in self.experiments: for (num_nodes, rpn, jpn) in self.experiments:
self.serial_expt(num_nodes, rpn, jpn) self.create_serial_expt(num_nodes, rpn, jpn)
num_jobs = num_nodes * jpn
success = util.run_launcher_until(lambda: done_query.count() == num_jobs)
self.assertEqual(done_query.count(), num_jobs)
process_job_times()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment