Commit 74502f8e authored by Michael Salim's avatar Michael Salim
Browse files

cleaned up logging. db_daemon now properly manages restarting/killing of server process

parent 8d2ed74a
......@@ -5,6 +5,7 @@ import os
import sys
import signal
import subprocess
import time
os.environ['IS_SERVER_DAEMON']="True"
......@@ -20,6 +21,7 @@ DB_COMMANDS = {
'postgres': f'',
'mysql' : f'',
}
term_start = 0
......@@ -29,10 +31,12 @@ def run(cmd):
return proc
def stop(proc):
print("Killing Balsam server process")
proc.terminate()
try: retcode = proc.wait(timeout=3)
except subprocess.TimeoutExpired: proc.kill()
print("Balsam server shutdown...", flush=True)
try: retcode = proc.wait(timeout=30)
except subprocess.TimeoutExpired:
print("Warning: server did not quit gracefully")
proc.kill()
def main(db_path):
......@@ -41,24 +45,18 @@ def main(db_path):
server_type = serverinfo['db_type']
db_cmd = f"BALSAM_DB_PATH={db_path} " + DB_COMMANDS[server_type].format(**serverinfo.data)
print(f"Starting balsam DB server daemon for DB at {db_path}")
print(f"\nStarting balsam DB server daemon for DB at {db_path}")
proc = run(db_cmd)
# On SIGINT/SIGTERM, start the clock & quit after TERM_LINGER sec
term_start = 0
def handle_term(signum, stack):
global term_start
if term_start == 0: term_start = time.time()
# On SIGUSR1, stop immediately ("balsam server --stop" does this)
def handle_stop(signum, stack):
stop(proc)
serverinfo['address'] = None
serverinfo.update({'address': None})
sys.exit(0)
signal.signal(signal.SIGINT, handle_term)
signal.signal(signal.SIGTERM, handle_term)
signal.signal(signal.SIGINT, handle_stop)
signal.signal(signal.SIGTERM, handle_stop)
signal.signal(signal.SIGUSR1, handle_stop)
while not term_start or time.time() - term_start < TERM_LINGER:
......@@ -67,7 +65,7 @@ def main(db_path):
except subprocess.TimeoutExpired:
pass
else:
print("server process stopped unexpectedly; restarting")
print("\nserver process stopped unexpectedly; restarting")
serverinfo.reset_server_address()
db_cmd = f"BALSAM_DB_PATH={db_path} " + DB_COMMANDS[server_type].format(**serverinfo.data)
proc = run(db_cmd)
......
......@@ -15,8 +15,12 @@ class ServerInfo:
self.refresh()
if self.data.get('address') and os.environ.get('IS_SERVER_DAEMON')=='True':
raise RuntimeError(f"Running server address is already posted at {self.path}"
' (use "balsam server --stop" to shut it down)')
raise RuntimeError(f"A running server address is already posted at {self.path}\n"
' Use "balsam dbserver --stop" to shut it down.\n'
' If you are sure there is no running server process, the'
' daemon did not have a clean shutdown.\n Use "balsam'
' dbserver --reset <balsam_db_directory>" to reset the server file'
)
def get_free_port_and_address(self):
hostname = socket.gethostname()
......
......@@ -61,15 +61,13 @@ DATABASES = configure_db_backend(BALSAM_PATH)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
LOGGING_DIRECTORY = os.path.join(BALSAM_PATH , 'log')
DATA_PATH = os.path.join(BALSAM_PATH ,'data')
BALSAM_WORK_DIRECTORY = os.path.join(DATA_PATH,'balsamjobs') # where to store local job data used for submission
ARGO_WORK_DIRECTORY = os.path.join(DATA_PATH,'argojobs')
BALSAM_WORK_DIRECTORY = DATA_PATH
for d in [
BALSAM_PATH ,
DATA_PATH,
LOGGING_DIRECTORY,
BALSAM_WORK_DIRECTORY,
ARGO_WORK_DIRECTORY
]:
if not os.path.exists(d):
os.makedirs(d)
......@@ -78,6 +76,7 @@ for d in [
# LOGGING SETUP
# ----------------
HANDLER_FILE = os.path.join(LOGGING_DIRECTORY, LOG_FILENAME)
BALSAM_DB_CONFIG_LOG = os.path.join(LOGGING_DIRECTORY, "balsamdb-config.log")
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
......@@ -101,6 +100,14 @@ LOGGING = {
'backupCount': LOG_BACKUP_COUNT,
'formatter': 'standard',
},
'balsam-db-config': {
'level':LOG_HANDLER_LEVEL,
'class':'logging.handlers.RotatingFileHandler',
'filename': BALSAM_DB_CONFIG_LOG,
'maxBytes': LOG_FILE_SIZE_LIMIT,
'backupCount': LOG_BACKUP_COUNT,
'formatter': 'standard',
},
'django': {
'level': LOG_HANDLER_LEVEL,
'class':'logging.handlers.RotatingFileHandler',
......@@ -121,6 +128,16 @@ LOGGING = {
'level': 'DEBUG',
'propagate': True,
},
'balsam.django_config': {
'handlers': ['balsam-db-config'],
'level': 'DEBUG',
'propagate': False,
},
'balsam.service.models': {
'handlers': ['balsam-db-config'],
'level': 'DEBUG',
'propagate': False,
},
}
}
......
......@@ -16,17 +16,25 @@ class Client:
self.logger = logging.getLogger(__name__)
self.server_info = server_info
self.serverAddr = self.server_info.get('address')
self.first_message = True
if self.serverAddr:
self.logger.debug(f"trying to reach DB write server at {self.serverAddr}")
response = self.send_request('TEST_ALIVE', timeout=3)
if response != 'ACK':
self.logger.exception(f"sqlite client cannot reach DB write server")
raise RuntimeError("Cannot reach DB write server")
try:
response = self.send_request('TEST_ALIVE', timeout=300)
except:
raise RuntimeError("Cannot reach server at {self.serverAddr}")
else:
if response != 'ACK':
self.logger.exception(f"sqlite client cannot reach DB write server")
raise RuntimeError("Cannot reach server at {self.serverAddr}")
def send_request(self, msg, timeout=None):
if timeout is None:
timeout = REQ_TIMEOUT
if self.first_message:
self.first_message = False
self.logger.debug(f"Connected to DB write server at {self.serverAddr}")
context = zmq.Context(1)
poll = zmq.Poller()
......@@ -52,6 +60,7 @@ class Client:
poll.unregister(client)
self.server_info.refresh()
self.serverAddr = self.server_info['address']
self.logger.debug(f"Connecting to DB write server at {self.serverAddr}")
context.term()
raise OperationalError(f"Sqlite client save request failed after "
......
......@@ -20,7 +20,8 @@ from balsam.django_config import serverinfo
logger = logging.getLogger('balsam.django_config.sqlite_server')
SERVER_PERIOD = 1000
TERM_LINGER = 20 # if SIGTERM, wait 20 sec after final save() to exit
TERM_LINGER = 3 # wait 3 sec after final save() to exit
terminate = False
class ZMQServer:
......@@ -65,11 +66,13 @@ class ZMQServer:
def server_main(db_path):
logger.debug("hello from server_main")
parent_pid = os.getppid()
global terminate
terminate = False
def handler(signum, stack):
global terminate
terminate = True
logger.debug("Got sigterm; will shut down soon")
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
......@@ -78,14 +81,15 @@ def server_main(db_path):
while not terminate or time.time() - last_save_time < TERM_LINGER:
message = server.recv_request()
logger.debug(f"received message: {message}")
if terminate:
logger.debug(f"shut down in {TERM_LINGER - (time.time()-last_save_time)} seconds")
if message is None:
if os.getppid() != parent_pid:
logger.info("detected parent died; server quitting")
break
logger.info("detected parent died; server quitting soon")
terminate = True
elif 'job_id' in message:
logger.debug("sending ACK_SAVE?")
logger.debug("sending ACK_SAVE")
last_save_time = server.save(message)
server.send_reply("ACK_SAVE")
else:
......
......@@ -306,6 +306,8 @@ def make_parser():
default=True, help="Start the DB server")
group.add_argument('--stop', action='store_true',
default=False, help="Kill the DB server")
group.add_argument('--reset', type=str,
default='', help="Balsam DB path at which to reset the address file")
parser_dbserver.add_argument('--path', type=str, default='',
help="Balsam DB directory path")
parser_dbserver.set_defaults(func=dbserver)
......
......@@ -361,8 +361,20 @@ def service(args):
print("dummy -- invoking balsam metascheduler service")
def dbserver(args):
from balsam.django_config import serverinfo
fname = find_spec("balsam.django_config.db_daemon").origin
if args.reset:
path = os.path.join(args.reset, serverinfo.ADDRESS_FNAME)
if not os.path.exists(path):
print("No db address file at reset path")
sys.exit(0)
else:
info = serverinfo.ServerInfo(args.reset)
info.update({'address': None})
print("Reset done")
sys.exit(0)
if args.stop:
server_pids = [int(line.split()[1]) for line in ls_procs('db_daemon')]
if not server_pids:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment