Commit 956c9786 authored by Michael Salim's avatar Michael Salim
Browse files

FT bugfixes

parent 16518fe3
# HPC Edge Service
**Authors:** J. Taylor Childers (Argonne National Laboratory), Tom Uram (Argonne National Laboratory), Doug Benjamin (Duke University)
# HPC Edge Service and Workflow Management System
**Authors:** J. Taylor Childers (Argonne National Laboratory), Tom Uram (Argonne National Laboratory), Doug Benjamin (Duke University), Misha Salim (Argonne National Laboratory)
An HPC Edge Service to manage remote job submission. The goal of this service is to provide a secure interface for submitting jobs to large computing resources.
# Prerequisites
This Edge Service uses [RabbitMQ](https://www.rabbitmq.com/) to communicate between the outside (Argo) and inside (Balsam) services. This service must be running on an accessible server machine to use this Edge Service.
The Argo and Balsam services require Python 3.6, mpi4py, Django, and django-concurrency.
To establish the needed environment on Cooley or Theta, it is recommended to use Anaconda:
### Cooley:
```
soft add +anaconda
conda config --add channels intel
conda create --name balsam intelpython3_full python=3
source ~/.conda/envs/balsam/bin/activate balsam
pip install django django-concurrency
```
### Additional steps on Theta:
cp -i /opt/cray/pe/mpt/7.6.0/gni/mpich-intel-abi/16.0/lib/libmpi* ~/.conda/envs/balsam/lib/
# “yes” to overwrite libmpi.so.12 and libmpifort.so.12
export LD_LIBRARY_PATH=~/.conda/envs/balsam/lib:$LD_LIBRARY_PATH
# Installation
```
git clone git@github.com:hep-cce/hpc-edge-service.git
git clone git@xgitlab.cels.anl.gov:turam/hpc-edge-service.git
cd hpc-edge-service
source activate balsam
virtualenv argobalsam_env
source argobalsam_env/bin/activate
pip install pip --upgrade
......
......@@ -131,12 +131,13 @@ def main(args, transition_pool, runner_group, job_source):
logger.info(f"Queued transition: {job.cute_id} will undergo {fxn}")
any_finished = runner_group.update_and_remove_finished()
if any_finished: wait = False
job_source.refresh_from_db()
if time.time() - last_created > 5:
created = create_new_runners(job_source.jobs, runner_group, worker_group)
if created:
last_created = time.time()
if any_finished or created: wait = False
wait = False
if wait: next(delay_timer)
def on_exit(runner_group, transition_pool, job_source):
......
import os
import random
import getpass
import sys
import signal
import time
......@@ -15,22 +16,70 @@ from tests.BalsamTestCase import create_job, create_app
BALSAM_TEST_DIR = os.environ['BALSAM_TEST_DIRECTORY']
def launcher_processes(keywords):
if type(keywords) == str: keywords = [keywords]
username = getpass.getuser()
searchcmd = 'ps aux | grep '
searchcmd += ' | grep '.join(f'"{k}"' for k in keywords)
print("Search command: ", searchcmd)
grep = subprocess.Popen(searchcmd, shell=True, stdout=subprocess.PIPE)
stdout,stderr = grep.communicate()
stdout = stdout.decode('utf-8')
processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
return processes
def sig_processes(process_lines, signal):
for line in process_lines:
proc = int(line.split()[1])
try:
os.kill(proc, signal)
except ProcessLookupError:
print("Could not find proc", line)
else:
print("Sent signal", signal, "to", proc)
def run_launcher_until(function, period=1.0, timeout=60.0):
print("DOING POPEN")
launcher_proc = subprocess.Popen(['balsam', 'launcher', '--consume',
'--max-ranks-per-node', '8'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid)
print("POLLING")
success = poll_until_returns_true(function, period=period, timeout=timeout)
print("POLL DONE")
print("These are the running launcher.py processes:")
processes = launcher_processes('launcher.py --consume')
print("\n".join(processes))
sig_processes(processes, signal.SIGTERM)
def check_processes_done():
procs = launcher_processes('launcher.py --consume')
return len(procs) == 0
poll_until_returns_true(check_processes_done, period=2, timeout=12)
processes = launcher_processes('launcher.py --consume')
if processes:
print("These are the launcher.py that did not end on SIGTERM:")
print("\n".join(processes))
sig_processes(processes, signal.SIGKILL)
# WEIRDEST BUG IN TESTING IF YOU OMIT THE FOLLOIWNG STATEMENT!
# launcher_proc.terminate() doesn't work; the process keeps on running and
# then you have two launchers from different test cases processing the same
# job...Very hard to catch bug.
os.killpg(os.getpgid(launcher_proc.pid), signal.SIGTERM) # Send the signal to all the process groups
time.sleep(10)
try: os.killpg(os.getpgid(launcher_proc.pid), signal.SIGKILL) # Send the signal to all the process groups
except ProcessLookupError: pass
#os.killpg(os.getpgid(launcher_proc.pid), signal.SIGTERM) # Send the signal to all the process groups
#time.sleep(10)
#try: os.killpg(os.getpgid(launcher_proc.pid), signal.SIGKILL) # Send the signal to all the process groups
#except ProcessLookupError: pass
#processes = launcher_processes()
#print("These launchers failed to die:")
#print("\n".join(processes))
return success
def run_launcher_seconds(seconds):
......@@ -537,6 +586,7 @@ class TestDAG(BalsamTestCase):
j.refresh_from_db()
return all(j.state=='JOB_FINISHED' for j in (parent,chA,chB))
print("running launcher until finished")
success = run_launcher_until(check)
self.assertTrue(success)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment