Commit badd48f6 authored by Michael Salim's avatar Michael Salim
Browse files

Test utility for generating job state history time CDFs

BalsamJob Model stores fractional seconds in state history
Serial experiment: independent hello world jobs. Auto-writes CDFs to
benchmark data directory.
parent 069f56ab
......@@ -18,7 +18,7 @@ class InvalidStateError(ValidationError): pass
class InvalidParentsError(ValidationError): pass
class NoApplication(Exception): pass
TIME_FMT = '%m-%d-%Y %H:%M:%S'
TIME_FMT = '%m-%d-%Y %H:%M:%S.%f'
STATES = '''
CREATED
......@@ -75,7 +75,7 @@ STATE_TIME_PATTERN = re.compile(r'''
\[ # opening square bracket
(\d+-\d+-\d\d\d\d # date MM-DD-YYYY
\s+ # one or more space
\d+:\d+:\d+) # time HH:MM:SS
\d+:\d+:\d+\.\d+) # time HH:MM:SS.MICROSEC
\s+ # one or more space
(\w+) # state
\s* # 0 or more space
......
......@@ -42,12 +42,7 @@ class TestInsertion(BalsamTestCase):
def test_concurrent_mpi_insert(self):
'''Timing: many MPI ranks simultaneously call dag.add_job'''
base = os.path.join(util.DATA_DIR, 'concurrent_insert.dat')
resultpath = base
i = 1
while os.path.exists(resultpath):
resultpath = f"{base}.{i}"
i += 1
resultpath = util.benchmark_outfile_path('concurrent_insert.dat')
title = 'test_concurrent_mpi_insert'
comment = 'Each rank simultaneously calls dag.add_job (num_ranks simultaneous insertions)'
......
import datetime
import os
import pprint
import re
import sys
from datetime import datetime
from importlib.util import find_spec
from balsam.service.models import BalsamJob, TIME_FMT
from balsam.service.models import BalsamJob
from tests.BalsamTestCase import BalsamTestCase
from tests.BalsamTestCase import create_job, create_app
from tests import util
def state_hist_pattern(state):
return re.compile(f'''
^ # start of line
\[ # opening square bracket
(\d+-\d+-\d\d\d\d # date MM-DD-YYYY
\s+ # one or more space
\d+:\d+:\d+) # time HH:MM:SS
\s+ # one or more space
{state} # state
\s* # 0 or more space
\] # closing square bracket
''',
re.VERBOSE | re.MULTILINE
)
p_ready = state_hist_pattern('READY')
p_pre = state_hist_pattern('PREPROCESSED')
p_rundone = state_hist_pattern('RUN_DONE')
p_finished = state_hist_pattern('JOB_FINISHED')
class TestNoOp(BalsamTestCase):
def setUp(self):
from itertools import takewhile, product
from itertools import product
self.launcherInfo = util.launcher_info()
max_workers = self.launcherInfo.num_workers
num_nodes = [2**n for n in range(1,13) if 2**n <= self.max_workers]
num_nodes = [2**n for n in range(0,13) if 2**n <= max_workers]
if num_nodes[-1] != max_workers:
num_nodes.append(max_workers)
#rpn = [64]
#jpn = [64, 256, 1024]
rpn = [16]
jpn = [64, 128]
self.experiments = itertools.product(num_nodes, rpn, jpn)
rpn = [64]
#jpn = [64, 512]
jpn = [16]
self.experiments = product(num_nodes, rpn, jpn)
def create_serial_expt(self, num_nodes, rpn, jpn):
'''Populate DB with set number of dummy serial jobs; no deps'''
BalsamJob.objects.all().delete()
num_jobs = num_nodes * jpn
......@@ -59,28 +35,30 @@ class TestNoOp(BalsamTestCase):
BalsamJob.objects.bulk_create(jobs)
self.assertEqual(BalsamJob.objects.count(), num_jobs)
def process_job_times(self):
state_data = BalsamJob.objects.values_list('state_history', flat=True)
ready_times = (p_ready.search(jobhist).group(1) for jobhist in statedata)
ready_times = [datetime.strptime(time_str, TIME_FMT) for time_str in ready_times]
time0 = min(ready_times)
ready_times = [(t - time0).seconds for t in ready_times]
print("Ready Times")
pprint(ready_times)
finished_times = (p_finished.search(jobhist).group(1) for jobhist in statedata)
finished_times = [datetime.strptime(time_str, TIME_FMT) for time_str in finished_times]
finished_times = [(t - time0).seconds for t in finished_times]
print("Finished Times")
pprint(finished_times)
def test_serial(self):
'''Populate DB, run launcher, get timing data from job histories
Serial: all jobs pack into MPIEnsembles and can run concurrently'''
done_query = BalsamJob.objects.filter(state='JOB_FINISHED')
for (num_nodes, rpn, jpn) in self.experiments:
title = f'{num_nodes}nodes_{rpn}rpn_{jpn}jpn'
self.create_serial_expt(num_nodes, rpn, jpn)
num_jobs = num_nodes * jpn
success = util.run_launcher_until(lambda: done_query.count() == num_jobs)
launcher_start_time = datetime.now()
success = util.run_launcher_until(lambda: done_query.count() == num_jobs,
timeout=1000, maxrpn=rpn)
self.assertEqual(done_query.count(), num_jobs)
process_job_times()
time_data = util.process_job_times(time0=launcher_start_time)
self.assertEqual(len(time_data['PREPROCESSED']), num_jobs)
self.assertEqual(len(time_data['JOB_FINISHED']), num_jobs)
cdf_table = util.print_jobtimes_cdf(time_data)
resultpath = util.benchmark_outfile_path('serial_no_op.dat')
with open(resultpath, 'w') as fp:
title = f'# {num_nodes} nodes, {rpn} rpn, {jpn} jpn ({num_jobs} total jobs)'
comment = 'All jobs pack into MPIEnsembles and can run concurrently'
fp.write(util.FormatTable.create_header(title, comment))
fp.write(cdf_table)
......@@ -157,7 +157,8 @@ class FormatTable:
self.widths[col] = max(self.widths[col], len(field)+4)
self.rows.append(row)
def create_header(self, title, comment):
@staticmethod
def create_header(title, comment):
from django.conf import settings
header = ''
cobalt_envs = {k:v for k,v in os.environ.items() if 'COBALT' in k}
......@@ -184,3 +185,58 @@ class FormatTable:
zip(row, self.columns))
table += "\n"
return table+"\n"
def process_job_times(time0=None):
'''Return a dict of {state : [list_of_seconds_for_each_job_to_reach_state]}
Useful for building CDF of completion times and profiling job processing throughput'''
from balsam.service.models import TIME_FMT, BalsamJob, STATE_TIME_PATTERN
from collections import defaultdict
from datetime import datetime
data = BalsamJob.objects.values_list('state_history', flat=True)
data = '\n'.join(data)
matches = STATE_TIME_PATTERN.finditer(data)
result = ( m.groups() for m in matches )
result = ( (state, datetime.strptime(time_str, TIME_FMT))
for (time_str, state) in result )
time_data = defaultdict(list)
for state, time in result:
time_data[state].append(time)
if time0 is None: time0 = min(time_data['READY'])
for state in time_data.keys():
time_data[state] = [(t - time0).total_seconds() for t in sorted(time_data[state])]
return time_data
def print_jobtimes_cdf(job_times):
import numpy as np
ready_times = np.array(job_times['READY'])
pre_times = np.array(job_times['PREPROCESSED'])
rundone_times = np.array(job_times['RUN_DONE'])
jobfin_times = np.array(job_times['JOB_FINISHED'])
max_time = round(max(jobfin_times) + 1.5)
time_grid = np.arange(0, max_time, 0.1)
count_ready = [(ready_times <= time_grid[i]).sum() for i in range(len(time_grid))]
count_pre = [(pre_times <= time_grid[i]).sum() for i in range(len(time_grid))]
count_rundone = [(rundone_times <= time_grid[i]).sum() for i in range(len(time_grid))]
count_jobfin = [(jobfin_times <= time_grid[i]).sum() for i in range(len(time_grid))]
result = "# Time num_ready num_preprocessed num_run_done num_job_finished\n"
for time, *counts in zip(time_grid, count_ready, count_pre, count_rundone, count_jobfin):
result += f'{time:8.3f} {" ".join("%3d" % c for c in counts)}\n'
return result
def benchmark_outfile_path(file_basename):
base = os.path.join(DATA_DIR, file_basename)
resultpath = base
i = 1
while os.path.exists(resultpath):
resultpath = f"{base}.{i}"
i += 1
return resultpath
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment