Commit b5aad9ba authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r4019] Added a connection setup between the job launcher and the proxies in

case of an abnormal exit, so that the runaway processes can be cleaned
up properly (based on the PID, instead of the executable name).

Algorithm: Each proxy keeps track of the PIDs of the processes it
launches and listens on a socket for incoming connections from the job
launcher. If the exit is clean, this socket is not used at all. However,
if the job launcher wants to kill the application (due to a timeout,
or an abort by another process in the application), a connection is
established on this socket and a message sent to the proxy to kill its
corresponding processes. We only support one command right now (KILLALL).

This should resolve ticket #447.
parent 27decdef
......@@ -10,7 +10,6 @@
#include "hydra.h"
HYD_Status HYD_BSCI_Launch_procs(void);
HYD_Status HYD_BSCI_Cleanup_procs(void);
HYD_Status HYD_BSCI_Get_universe_size(int *size);
HYD_Status HYD_BSCI_Wait_for_completion(void);
HYD_Status HYD_BSCI_Finalize(void);
......
......@@ -27,12 +27,6 @@ HYD_Status HYD_BSCI_Launch_procs(void)
HYDU_FUNC_ENTER();
status = HYD_BSCU_Set_common_signals(HYD_BSCU_Signal_handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set signal\n");
goto fn_fail;
}
/* FIXME: Instead of directly reading from the HYD_Handle
* structure, the upper layers should be able to pass what exactly
* they want launched. Without this functionality, the proxy
......@@ -97,56 +91,3 @@ HYD_Status HYD_BSCI_Launch_procs(void)
fn_fail:
goto fn_exit;
}
HYD_Status HYD_BSCI_Cleanup_procs(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
char *client_arg[HYD_EXEC_ARGS], *execname;
int arg;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
client_arg[arg++] = MPIU_Strdup("-x");
/* ssh does not support any partition names other than host names */
client_arg[arg++] = MPIU_Strdup(partition->name);
client_arg[arg++] = NULL;
for (arg = 0; client_arg[arg]; arg++);
client_arg[arg++] = MPIU_Strdup("killall");
execname = strrchr(proc_params->exec[0], '/');
if (!execname)
execname = proc_params->exec[0];
else
execname++;
client_arg[arg++] = MPIU_Strdup(execname);
client_arg[arg++] = NULL;
status = HYDU_Create_process(client_arg, NULL, NULL, NULL, NULL);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
}
}
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
......@@ -7,7 +7,7 @@
HYDRA_LIB_PATH = ../../lib
libhydra_a_DIR = ${HYDRA_LIB_PATH}
libhydra_a_SOURCES = bscu_wait.c bscu_signal.c
libhydra_a_SOURCES = bscu_wait.c
INCLUDES = -I${abs_srcdir}/../../include \
-I${abs_srcdir}/../../../../include \
-I../../include \
......
......@@ -12,7 +12,5 @@
#include "bsci.h"
HYD_Status HYD_BSCU_Wait_for_completion(void);
HYD_Status HYD_BSCU_Set_common_signals(void (*handler) (int));
void HYD_BSCU_Signal_handler(int signal);
#endif /* BSCI_H_INCLUDED */
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* (C) 2008 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "hydra.h"
#include "hydra_utils.h"
#include "bsci.h"
#include "bscu.h"
HYD_Status HYD_BSCU_Set_common_signals(void (*handler) (int))
{
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
status = HYDU_Set_signal(SIGINT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set SIGINT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGQUIT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set SIGQUIT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGTERM, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set SIGTERM signal\n");
goto fn_fail;
}
#if defined SIGSTOP
status = HYDU_Set_signal(SIGSTOP, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set SIGSTOP signal\n");
goto fn_fail;
}
#endif /* SIGSTOP */
#if defined SIGCONT
status = HYDU_Set_signal(SIGCONT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("signal utils returned error when trying to set SIGCONT signal\n");
goto fn_fail;
}
#endif /* SIGCONT */
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
void HYD_BSCU_Signal_handler(int signal)
{
HYDU_FUNC_ENTER();
if (signal == SIGINT || signal == SIGQUIT || signal == SIGTERM
#if defined SIGSTOP
|| signal == SIGSTOP
#endif /* SIGSTOP */
#if defined SIGCONT
|| signal == SIGCONT
#endif /* SIGSTOP */
) {
/* There's nothing we can do with the return value for now. */
HYD_BSCI_Cleanup_procs();
exit(-1);
}
else {
/* Ignore other signals for now */
}
HYDU_FUNC_EXIT();
return;
}
......@@ -63,13 +63,8 @@ HYD_Status HYD_BSCU_Wait_for_completion(void)
* and should not be used. */
} while (not_completed > 0);
if (not_completed) {
status = HYD_BSCI_Cleanup_procs();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap process cleanup failed\n");
goto fn_fail;
}
}
if (not_completed)
status = HYD_INTERNAL_ERROR;
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -16,5 +16,4 @@ INCLUDES = -I${abs_srcdir}/../../include \
-I${abs_srcdir}/../include \
-I${abs_srcdir}/../utils \
-I${abs_srcdir}/../../pm/include \
-I${abs_srcdir}/../../bootstrap/include \
-I${abs_srcdir}/../../demux
......@@ -8,7 +8,6 @@
#include "hydra_utils.h"
#include "csi.h"
#include "pmci.h"
#include "bsci.h"
#include "demux.h"
HYD_Handle handle;
......
......@@ -7,7 +7,6 @@
#include "hydra.h"
#include "csi.h"
#include "pmci.h"
#include "bsci.h"
#include "demux.h"
HYD_Status HYD_CSI_Finalize(void)
......@@ -22,12 +21,6 @@ HYD_Status HYD_CSI_Finalize(void)
goto fn_fail;
}
status = HYD_BSCI_Finalize();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap server finalize returned an error\n");
goto fn_fail;
}
status = HYD_DMX_Finalize();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("demux engine finalize returned an error\n");
......
......@@ -8,7 +8,6 @@
#include "csi.h"
#include "csiu.h"
#include "pmci.h"
#include "bsci.h"
#include "demux.h"
HYD_Handle handle;
......@@ -47,11 +46,11 @@ HYD_Status HYD_CSI_Wait_for_completion(void)
if (sockets_open && HYDU_Time_left(handle.start, handle.timeout))
continue;
/* Make sure all the processes have terminated. The bootstrap
* control device will take care of that. */
status = HYD_BSCI_Wait_for_completion();
/* Make sure all the processes have terminated. The process
* manager control device will take care of that. */
status = HYD_PMCI_Wait_for_completion();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap server returned error when waiting for completion\n");
HYDU_Error_printf("process manager returned error when waiting for completion\n");
goto fn_fail;
}
......
......@@ -15,5 +15,4 @@ INCLUDES = -I${abs_srcdir}/../../include \
-I${abs_srcdir}/../../launcher/utils \
-I${abs_srcdir}/../include \
-I${abs_srcdir}/../../pm/include \
-I${abs_srcdir}/../../bootstrap/include \
-I${abs_srcdir}/../../demux
......@@ -137,8 +137,11 @@ HYD_Status HYD_DMX_Wait_for_event(int time)
ret = poll(pollfds, total_fds, time);
if (ret < 0) {
if (errno == EINTR) {
/* We were interrupted by a system call; loop back */
continue;
/* We were interrupted by a system call; this is not
* an error case in the regular sense; but the upper
* layer needs to gracefully cleanup the processes. */
status = HYD_SUCCESS;
goto fn_exit;
}
HYDU_Error_printf("poll error (errno: %d)\n", errno);
status = HYD_SOCK_ERROR;
......
......@@ -12,12 +12,13 @@
#include "hydra_utils.h"
struct HYD_Handle_ {
char *base_path;
int proxy_port;
char *boot_server;
int debug;
int enablex;
char *wdir;
char *base_path;
char *host_file;
/* Global environment */
......
......@@ -51,6 +51,8 @@
extern char **environ;
#endif /* MANUAL_EXTERN_ENVIRON */
#define HYD_DEFAULT_PROXY_PORT 9899
typedef enum {
HYD_SUCCESS = 0,
HYD_NO_MEM,
......
......@@ -116,6 +116,7 @@ extern char *strsignal(int);
#endif
HYD_Status HYDU_Set_signal(int signum, void (*handler) (int));
HYD_Status HYDU_Set_common_signals(void (*handler) (int));
/* Timer utilities */
......
......@@ -25,6 +25,7 @@ static void usage(void)
printf("\t-genvnone [Do not pass any environment variables]\n");
printf("\t-genvall [Pass all environment variables (default)]\n");
printf("\t-f {name} [File containing the host names]\n");
printf("\t--proxy-port [Port on which proxies can listen]\n");
printf("\n");
......
......@@ -108,6 +108,7 @@ HYD_Status HYD_LCHI_Get_parameters(int t_argc, char **t_argv)
handle.enablex = -1;
handle.wdir = NULL;
handle.host_file = NULL;
handle.proxy_port = -1;
status = HYDU_Get_base_path(argv[0], &handle.base_path);
if (status != HYD_SUCCESS) {
......@@ -175,6 +176,22 @@ HYD_Status HYD_LCHI_Get_parameters(int t_argc, char **t_argv)
continue;
}
/* Check if the proxy port is set */
if (!strcmp(*argv, "--proxy-port")) {
CHECK_LOCAL_PARAM_START(local_params_started, status);
CHECK_NEXT_ARG_VALID(status);
if (handle.proxy_port != -1) {
HYDU_Error_printf("Duplicate proxy port setting; previously set to %d\n",
handle.proxy_port);
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
handle.proxy_port = atoi(*argv);
continue;
}
/* Check what all environment variables need to be propagated */
if (!strcmp(*argv, "-genvall") || !strcmp(*argv, "-genvnone") ||
!strcmp(*argv, "-genvlist")) {
......@@ -420,6 +437,11 @@ HYD_Status HYD_LCHI_Get_parameters(int t_argc, char **t_argv)
proc_params = proc_params->next;
}
/* If the proxy port is not set, set it to the default value */
if (handle.proxy_port == -1) {
handle.proxy_port = HYD_DEFAULT_PROXY_PORT;
}
fn_exit:
HYDU_FUNC_EXIT();
return status;
......
......@@ -15,5 +15,7 @@ enum HYD_Proxy_cmds {
extern int HYD_PMCD_Central_listenfd;
HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events);
HYD_Status HYD_PMCD_Central_cleanup(void);
void HYD_PMCD_Central_signal_cb(int signal);
#endif /* CENTRAL_H_INCLUDED */
......@@ -13,6 +13,7 @@
#include "central.h"
int HYD_PMCD_Central_listenfd;
HYD_Handle handle;
/*
* HYD_PMCD_Central_cb: This is the core PMI server part of the
......@@ -58,6 +59,14 @@ HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events)
goto fn_fail;
}
/* Make this socket non-blocking as we should not keep waiting
* for data on the PMI connections. */
status = HYDU_Sock_set_nonblock(accept_fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("sock utils returned error setting socket to non-blocking\n");
goto fn_fail;
}
status = HYD_DMX_Register_fd(1, &accept_fd, HYD_STDOUT, HYD_PMCD_Central_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("demux engine returned error when registering fd\n");
......@@ -75,7 +84,7 @@ HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events)
/* This is not a clean close. If a finalize was called, we
* would have deregistered this socket. The application
* might have aborted. Just cleanup all the processes */
status = HYD_BSCI_Cleanup_procs();
status = HYD_PMCD_Central_cleanup();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap server returned error cleaning up processes\n");
goto fn_fail;
......@@ -151,8 +160,7 @@ HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events)
/* Cleanup all the processes and return. We don't need to
* check the return status since we are anyway returning
* an error */
HYD_BSCI_Cleanup_procs();
HYD_PMCD_Central_cleanup();
status = HYD_INTERNAL_ERROR;
goto fn_fail;
}
......@@ -171,3 +179,68 @@ HYD_Status HYD_PMCD_Central_cb(int fd, HYD_Event_t events)
fn_fail:
goto fn_exit;
}
HYD_Status HYD_PMCD_Central_cleanup(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
int fd;
enum HYD_Proxy_cmds cmd;
HYD_Status status = HYD_SUCCESS, overall_status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
/* FIXME: Instead of doing this from this process itself, fork a
* bunch of processes to do this. */
/* Connect to all proxies and send a KILL command */
cmd = KILLALL_PROCS;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
status = HYDU_Sock_connect(partition->name, handle.proxy_port, &fd);
if (status != HYD_SUCCESS) {
overall_status = HYD_INTERNAL_ERROR;
HYDU_Error_printf("unable to connect to the proxy on %s\n", partition->name);
continue; /* Move on to the next proxy */
}
status = HYDU_Sock_write(fd, &cmd, sizeof(cmd));
if (status != HYD_SUCCESS) {
overall_status = HYD_INTERNAL_ERROR;
HYDU_Error_printf("unable to send data to the proxy on %s\n", partition->name);
continue; /* Move on to the next proxy */
}
close(fd);
}
}
HYDU_FUNC_EXIT();
return overall_status;
}
void HYD_PMCD_Central_signal_cb(int signal)
{
HYDU_FUNC_ENTER();
if (signal == SIGINT || signal == SIGQUIT || signal == SIGTERM
#if defined SIGSTOP
|| signal == SIGSTOP
#endif /* SIGSTOP */
#if defined SIGCONT
|| signal == SIGCONT
#endif /* SIGSTOP */
) {
/* There's nothing we can do with the return value for now. */
HYD_PMCD_Central_cleanup();
exit(-1);
}
else {
/* Ignore other signals for now */
}
HYDU_FUNC_EXIT();
return;
}
......@@ -36,6 +36,12 @@ HYD_Status HYD_PMCI_Finalize(void)
goto fn_fail;
}
status = HYD_BSCI_Finalize();
if (status != HYD_SUCCESS) {
HYDU_Error_printf("unable to finalize the bootstrap server\n");
goto fn_fail;
}
fn_exit:
HYDU_FUNC_EXIT();
return status;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment