Commit 7fb9f45e authored by Michael Salim's avatar Michael Salim
Browse files

* Restructured balsam database and data/logging folders

* User explicitly creates balsam DB directory with "balsam init"
* User launches DB server with "balsam dbserver"
* user_settings is vastly simplified...moved all configuration logic
into django_config module
* Database is auto-configured on django startup; will connect to DB
server automatically if client
parent 6672de77
......@@ -18,3 +18,5 @@ experiments
docs/_build/*
docs/_static/*
*.egg-info
default_balsamdb
import argparse
from importlib.util import find_spec
import glob
import os
import sys
import signal
import subprocess
from balsam.django_config.settings import resolve_db_path
from serverinfo import ServerInfo
CHECK_PERIOD = 4
TERM_LINGER = 30
PYTHON = sys.executable
SQLITE_SERVER = find_spec('balsam.django_config.sqlite_server').origin
DB_COMMANDS = {
'sqlite3' : f'{PYTHON} {SQLITE_SERVER}',
'postgres': f'',
'mysql' : f'',
}
def run(cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return proc
def stop(proc):
print("Killing Balsam server process")
proc.terminate()
try: retcode = proc.wait(timeout=3)
except subprocess.TimeoutExpired: proc.kill()
def main(db_path):
serverinfo = ServerInfo(db_path)
serverinfo.reset_server_address()
server_type = serverinfo['db_type']
db_cmd = f"BALSAM_DB_PATH={db_path} " + DB_COMMANDS[server_type].format(**serverinfo.data)
print("Starting balsam DB server daemon for DB at {db_path}")
print(db_cmd)
sys.exit(0)
proc = run(db_cmd)
# On SIGINT/SIGTERM, start the clock & quit after TERM_LINGER sec
term_start = 0
def handle_term(signum, stack):
global term_start
if term_start == 0: term_start = time.time()
# On SIGUSR1, stop immediately ("balsam server --stop" does this)
def handle_stop(signum, stack):
stop(proc)
serverinfo['address'] = None
sys.exit(0)
signal.signal(signal.SIGINT, handle_term)
signal.signal(signal.SIGTERM, handle_term)
signal.signal(signal.SIGUSR1, handle_stop)
while not term_start or time.time() - term_start < TERM_LINGER:
try:
retcode = proc.wait(timeout=CHECK_PERIOD)
except subprocess.TimeoutExpired:
pass
else:
print("server process stopped unexpectedly; restarting")
serverinfo.reset_server_address()
db_cmd = f"BALSAM_DB_PATH={db_path} " + DB_COMMANDS[server_type].format(**serverinfo.data)
proc = run(db_cmd)
stop(proc)
serverinfo.update({'address': None})
if __name__ == "__main__":
os.environ['IS_SERVER_DAEMON']="True"
input_path = sys.argv[1] if len(sys.argv) == 2 else None
db_path = resolve_db_path(input_path)
main(db_path)
import json
import os
import socket
ADDRESS_FNAME = 'dbwriter_address'
class ServerInfo:
def __init__(self, balsam_db_path):
self.path = os.path.join(balsam_db_path, ADDRESS_FNAME)
self.data = {}
if not os.path.exists(self.path):
self.update(self.data)
else:
self.refresh()
if self.data.get('address') and os.environ.get('IS_SERVER_DAEMON')=='True':
raise RuntimeError(f"Running server address is already posted at {self.path}"
' (use "balsam server --stop" to shut it down)')
def get_free_port_and_address(self):
hostname = socket.gethostname()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0))
port = int(sock.getsockname()[1])
sock.close()
address = f'tcp://{hostname}:{port}'
return address
def get_sqlite3_info(self):
new_address = self.get_free_port_and_address()
info = dict(db_type='sqlite3', address=new_address)
return info
def reset_server_address(self):
db = self['db_type']
info = getattr(self, f'get_{db}_info')()
self.update(info)
def update(self, update_dict):
self.refresh()
self.data.update(update_dict)
with open(self.path, 'w') as fp:
fp.write(json.dumps(self.data))
def get(self, key, default=None):
if key in self.data:
return self.data[key]
else:
return default
def refresh(self):
if not os.path.exists(self.path): return
with open(self.path, 'r') as fp:
self.data = json.loads(fp.read())
def __getitem__(self, key):
if self.data is None: self.refresh()
return self.data[key]
def __setitem__(self, key, value):
self.update({key:value})
......@@ -10,18 +10,136 @@ For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.9/ref/settings/
"""
import os,logging
logger = logging.getLogger(__name__)
import os
import sys
import logging
from balsam.django_config import serverinfo, sqlite_client
from balsam.user_settings import *
logger = logging.getLogger(__name__)
# ---------------
# DATABASE SETUP
# ---------------
def resolve_db_path(path=None):
if path:
assert os.path.exists(path)
elif os.environ.get('BALSAM_DB_PATH'):
path = os.environ['BALSAM_DB_PATH']
assert os.path.exists(path)
else:
path = default_db_path
return path
def configure_db_backend(db_path):
ENGINES = {
'sqlite3' : 'django.db.backends.sqlite3',
}
NAMES = {
'sqlite3' : 'db.sqlite3',
}
OPTIONS = {
'sqlite3' : {'timeout' : 5000},
}
info = serverinfo.ServerInfo(db_path)
db_type = info['db_type']
user = info.get('user', '')
password = info.get('password', '')
db_name = os.path.join(db_path, NAMES[db_type])
db = dict(ENGINE=ENGINES[db_type], NAME=db_name,
OPTIONS=OPTIONS[db_type], USER=user, PASSWORD=password)
DATABASES = {'default':db}
return DATABASES
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONCURRENCY_ENABLED = True
BALSAM_PATH = resolve_db_path()
DATABASES = configure_db_backend(BALSAM_PATH)
# -----------------------
# SQLITE CLIENT SETUP
# ------------------------
is_server = os.environ.get('IS_BALSAM_SERVER')=='True'
using_sqlite = DATABASES['default']['ENGINE'].endswith('sqlite3')
SAVE_CLIENT = None
if using_sqlite and not is_server:
SAVE_CLIENT = sqlite_client.Client(serverinfo.ServerInfo(BALSAM_PATH))
if SAVE_CLIENT.serverAddr is None:
logger.debug("SQLite client: writing straight to disk")
SAVE_CLIENT = None
else:
logger.debug(f"SQL client: save() via {client.serverAddr}")
# --------------------
# SUBDIRECTORY SETUP
# --------------------
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOGGING_DIRECTORY = os.path.join(BALSAM_PATH , 'log')
DATA_PATH = os.path.join(BALSAM_PATH ,'data')
BALSAM_WORK_DIRECTORY = os.path.join(DATA_PATH,'balsamjobs') # where to store local job data used for submission
ARGO_WORK_DIRECTORY = os.path.join(DATA_PATH,'argojobs')
for d in [
BALSAM_PATH ,
DATA_PATH,
LOGGING_DIRECTORY,
BALSAM_WORK_DIRECTORY,
ARGO_WORK_DIRECTORY
]:
if not os.path.exists(d):
os.makedirs(d)
# ----------------
# LOGGING SETUP
# ----------------
HANDLER_FILE = os.path.join(LOGGING_DIRECTORY, LOG_FILENAME)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format' : '%(asctime)s|%(process)d|%(levelname)8s|%(name)s:%(lineno)s] %(message)s',
'datefmt' : "%d-%b-%Y %H:%M:%S"
},
},
'handlers': {
'console': {
'class':'logging.StreamHandler',
'formatter': 'standard',
'level' : 'DEBUG'
},
'default': {
'level':LOG_HANDLER_LEVEL,
'class':'logging.handlers.RotatingFileHandler',
'filename': HANDLER_FILE,
'maxBytes': LOG_FILE_SIZE_LIMIT,
'backupCount': LOG_BACKUP_COUNT,
'formatter': 'standard',
}
},
'loggers': {
'django':{
'handlers': ['default'],
'level': 'DEBUG',
'propagate': True,
},
'balsam': {
'handlers': ['default'],
'level': 'DEBUG',
'propagate': True,
},
}
}
def log_uncaught_exceptions(exctype, value, tb,logger=logger):
logger.error(f"Uncaught Exception {exctype}: {value}",exc_info=(exctype,value,tb))
logger = logging.getLogger('console')
logger.error(f"Uncaught Exception {exctype}: {value}",exc_info=(exctype,value,tb))
sys.excepthook = log_uncaught_exceptions
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.9/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '=gyp#o9ac0@w3&-^@a)j&f#_n-o=k%z2=g5u@z5+klmh_*hebj'
......@@ -31,7 +149,6 @@ DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
......
from io import StringIO
from traceback import print_exc
import json
import os
import logging
import zmq
from django.db.utils import OperationalError
REQ_TIMEOUT = 10000 # 10 seconds
REQ_RETRY = 3
logger = logging.getLogger(__name__)
class Client:
def __init__(self, server_info):
self.server_info = server_info
self.serverAddr = self.server_info.get('address')
if self.serverAddr:
response = self.send_request('TEST_ALIVE', timeout=3000)
if response != 'ACK':
logger.exception(f"sqlite client cannot reach DB write server")
raise RuntimeError("Cannot reach DB write server")
def send_request(self, msg, timeout=None):
if timeout is None:
timeout = REQ_TIMEOUT
context = zmq.Context(1)
poll = zmq.Poller()
for retry in range(REQ_RETRY):
client = context.socket(zmq.REQ)
client.connect(self.serverAddr)
poll.register(client, zmq.POLLIN)
client.send_string(msg)
socks = dict(poll.poll(timeout))
if socks.get(client) == zmq.POLLIN:
reply = client.recv()
context.term()
return reply.decode('utf-8')
else:
logger.debug("No response from server, retrying...")
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
self.server_info.refresh()
self.serverAddr = self.server_info['address']
context.term()
raise OperationalError(f"Sqlite client save request failed after "
f"{REQ_RETRY} retries: is the server down?")
def save(self, job, force_insert=False, force_update=False, using=None, update_fields=None):
serial_data = job.serialize(force_insert=force_insert,
force_update=force_update, using=using,
update_fields=update_fields)
logger.info(f"client: sending request for save of {job.cute_id}")
response = self.send_request(serial_data)
assert response == 'ACK_SAVE'
from io import StringIO
from traceback import print_exc
import json
import os
import logging
import time
import zmq
from socket import gethostname
import signal
from django.conf import settings
from django.db.utils import OperationalError
from balsam.django_config import serverinfo
logger = logging.getLogger(__name__)
SERVER_PERIOD = 1000
TERM_LINGER = 20 # if SIGTERM, wait 20 sec after final save() to exit
os.environ['IS_BALSAM_SERVER']="True"
os.environ['IS_SERVER_DAEMON']="False"
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
class ZMQServer:
def __init__(self, db_path):
# connect to local sqlite DB thru ORM
import django
django.setup()
from balsam.service.models import BalsamJob
self.BalsamJob = BalsamJob
self.info = serverinfo.ServerInfo(db_path)
self.address = self.info['address']
port = int(self.address.split(':'][2])
self.context = zmq.Context(1)
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f'tcp://*:{port}')
logger.info(f"db_writer bound to socket @ {self.address}")
def recv_request(self):
events = self.socket.poll(timeout=SERVER_PERIOD)
if events:
message = self.socket.recv().decode('utf-8')
else:
message = None
return message
def send_reply(self, msg):
self.socket.send_string(msg)
def save(self, job_msg):
d = json.loads(job_msg)
job = self.BalsamJob.from_dict(d)
force_insert = d['force_insert']
force_update = d['force_update']
using = d['using']
update_fields = d['update_fields']
job.save(force_insert, force_update, using, update_fields)
logger.info(f"db_writer Saved {job.cute_id}")
return time.time()
def server_main(db_path):
logger.debug("hello from server_main")
parent_pid = os.getppid()
terminate = False
def handler(signum, stack):
global terminate
terminate = True
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
server = ZMQServer(db_path)
last_save_time = time.time()
while not terminate or time.time() - last_save_time < TERM_LINGER:
message = server.recv_request()
if message is None:
if os.getppid() != parent_pid:
logger.info("detected parent died; server quitting")
break
elif 'job_id' in message:
last_save_time = server.save(message)
server.send_reply("ACK_SAVE")
else:
server.send_reply("ACK")
if __name__ == "__main__":
db_path = os.environ['BALSAM_DB_PATH']
try:
server_main(db_path)
except:
buf = StringIO()
print_exc(file=buf)
logger.exception(f"db_writer Uncaught exception:\n%s", buf.getvalue())
finally:
logger.info("exiting server main")
# These must come before any other imports
import django
import os
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
django.setup()
# --------------
import argparse
import sys
from balsam.scripts.cli_commands import newapp,newjob,newdep,ls,modify,rm,qsub
from balsam.scripts.cli_commands import kill,mkchild,launcher,service,make_dummies
from django.conf import settings
from balsam.scripts.cli_commands import dbserver, init
def main():
parser = make_parser()
......@@ -48,7 +44,6 @@ def make_parser():
# ADD JOB
# -------
BALSAM_SITE = settings.BALSAM_SITE
parser_job = subparsers.add_parser('job',
help="add a new Balsam job",
description="add a new Balsam job",
......@@ -76,7 +71,7 @@ def make_parser():
'job).')
parser_job.add_argument('--allowed-site', action='append',
required=False, default=[BALSAM_SITE],
required=False, default=[],
help="Balsam instances where this job can run; "
"defaults to the local Balsam instance")
......@@ -227,7 +222,7 @@ def make_parser():
type=int, required=True)
parser_mkchild.add_argument('--allowed-site', action='append',
required=False, default=[BALSAM_SITE],
required=False, default=[],
help="Balsam instances where this job can run; "
"defaults to the local Balsam instance")
......@@ -302,6 +297,28 @@ def make_parser():
parser_launcher.add_argument('--daemon', action='store_true')
parser_launcher.set_defaults(func=launcher)
# -----------------
# DBSERVER
# --------
parser_dbserver = subparsers.add_parser('dbserver', help="Start/stop database server process")
group = parser_dbserver.add_mutually_exclusive_group(required=False)
group.add_argument('--start', action='store_true',
default=True, help="Start the DB server")
group.add_argument('--stop', action='store_true',
default=False, help="Kill the DB server")
parser_dbserver.add_argument('--path', type=str, default='',
help="Balsam DB directory path")
parser_dbserver.set_defaults(func=dbserver)
# -----------------
# INIT
# --------
parser_init = subparsers.add_parser('init', help="Create new balsam DB")
parser_init.add_argument('path', help="Path to Balsam DB directory")
parser_init.add_argument('--db-type', choices=['sqlite3'],
default='sqlite3', help="choose backend to use")
parser_init.set_defaults(func=init)
# -----------------
# SERVICE
......
import getpass
import os
from importlib.util import find_spec
import subprocess
import signal
import sys
from django.conf import settings
from balsam.service import models
from balsam.launcher import dag
import balsam.scripts.ls_commands as lscmd
import django
Job = models.BalsamJob
AppDef = models.ApplicationDefinition
def ls_procs(keywords):
if type(keywords) == str: keywords = [keywords]
username = getpass.getuser()
searchcmd = 'ps aux | grep '
searchcmd += ' | grep '.join(f'"{k}"' for k in keywords)
grep = subprocess.Popen(searchcmd, shell=True, stdout=subprocess.PIPE)
stdout,stderr = grep.communicate()
stdout = stdout.decode('utf-8')
processes = [line for line in stdout.split('\n') if 'python' in line and line.split()[0]==username]
return processes
def cmd_confirmation(message=''):
confirm = ''
......@@ -19,6 +30,13 @@ def cmd_confirmation(message=''):
return confirm.lower() == 'y'
def newapp(args):
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
django.setup()
from django.conf import settings
from balsam.service import models
from balsam.launcher import dag
Job = models.BalsamJob
AppDef = models.ApplicationDefinition
def py_app_path(path):
if not path: return path
......@@ -53,6 +71,18 @@ def newapp(args):
def newjob(args):
os.environ['DJANGO_SETTINGS_MODULE'] = 'balsam.django_config.settings'
django.setup()
from django.conf import settings
from balsam.service import models
from balsam.launcher import dag
Job = models.BalsamJob
AppDef = models.ApplicationDefinition
BALSAM_SITE = settings.BALSAM_SITE
if not args.allowed_site:
args.allowed_site = [BALSAM_SITE]
if not AppDef.objects.filter(name=args.application).exists():
raise RuntimeError(f"App {args.application} not registered in local DB")