Commit 6d08e262 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r5891] Revamp the bootstrap interface (and the current implementations) to

not deal with the entire proxy structure. Parts that the bootstrap
server creates and destroys (stdio fds, pids) should not be exposed to
the upper layers.
parent 835bf007
......@@ -71,7 +71,7 @@ abs_srcdir=`(cd $srcdir && pwd)`
# Check if the necessary headers are available
AC_CHECK_HEADERS(unistd.h stdlib.h string.h strings.h stdarg.h sys/types.h sys/socket.h \
sched.h pthread.h sys/stat.h sys/param.h netinet/in.h netinet/tcp.h \
sys/un.h netdb.h)
sys/un.h netdb.h sys/time.h time.h)
# Check if the pthread library is present. Apparently, just checking
# for pthread.h is not sufficient.
......
......@@ -32,11 +32,12 @@ struct HYD_handle {
HYD_status(*stdout_cb) (int fd, HYD_event_t events, void *userp);
HYD_status(*stderr_cb) (int fd, HYD_event_t events, void *userp);
/* Start time and timeout. These are filled in by the UI, but are
* utilized by the demux engine and the boot-strap server to
* decide how long we need to wait for. */
HYD_time start;
HYD_time timeout;
/* Timeout (in seconds) is filled in by the UI to be passed to the
* bootstrap server.
*
* FIXME: make this a function parameter.
*/
int timeout;
/* All of the available nodes */
struct HYD_node *node_list;
......
......@@ -38,13 +38,18 @@
#include <sys/stat.h>
#endif /* HAVE_SYS_STAT_H */
#include <errno.h>
#if defined HAVE_GETTIMEOFDAY
/* FIXME: Is time.h available everywhere? We should probably have
* multiple timer options. */
#if defined HAVE_TIME_H
#include <time.h>
#endif /* HAVE_TIME_H */
#if defined HAVE_SYS_TIME_H
#include <sys/time.h>
#endif /* HAVE_SYS_TIME_H */
#include <errno.h>
#if !defined HAVE_GETTIMEOFDAY
#error "hydra requires gettimeofday support"
#endif /* HAVE_GETTIMEOFDAY */
#if defined MAXHOSTNAMELEN
......@@ -102,6 +107,7 @@ typedef enum {
HYD_NO_MEM,
HYD_SOCK_ERROR,
HYD_INVALID_PARAM,
HYD_TIMED_OUT,
HYD_INTERNAL_ERROR
} HYD_status;
......@@ -176,11 +182,6 @@ struct HYD_proxy {
int proxy_id;
int pid;
int in; /* stdin is only valid for proxy_id 0 */
int out;
int err;
int start_pid;
int proxy_process_count;
......
......@@ -129,7 +129,7 @@ void HYDU_free_proxy_list(struct HYD_proxy *proxy_list);
HYD_status HYDU_alloc_proxy_exec(struct HYD_proxy_exec **exec);
/* args */
HYD_status HYDU_find_in_path(const char *execname, char **path);
char *HYDU_find_full_path(const char *execname);
char *HYDU_getcwd(void);
HYD_status HYDU_get_base_path(const char *execname, char *wdir, char **path);
HYD_status HYDU_parse_hostfile(char *hostfile,
......@@ -172,8 +172,6 @@ HYD_status HYDU_join_thread(struct HYD_thread_context ctxt);
#endif /* HAVE_THREAD_SUPPORT */
/* others */
HYD_status HYDU_merge_proxy_segment(char *name, int start_pid, int core_count,
struct HYD_proxy **proxy_list);
int HYDU_local_to_global_id(int local_id, int start_pid, int core_count,
int global_core_count);
HYD_status HYDU_add_to_node_list(char *hostname, int num_procs, struct HYD_node **node_list);
......@@ -274,13 +272,4 @@ char *HYDU_strerror(int error);
int HYDU_strlist_lastidx(char **strlist);
char **HYDU_str_to_strlist(char *str);
/* Timer utilities */
/* FIXME: HYD_time should be OS specific */
#ifdef HAVE_TIME
#include <time.h>
#endif /* HAVE_TIME */
typedef struct timeval HYD_time;
void HYDU_time_set(HYD_time * time, int *val);
int HYDU_time_left(HYD_time start, HYD_time timeout);
#endif /* HYDRA_UTILS_H_INCLUDED */
......@@ -15,82 +15,6 @@
static char *pmi_port_str = NULL;
static char *proxy_port_str = NULL;
static void *launch_helper(void *args)
{
struct HYD_proxy *proxy = (struct HYD_proxy *) args;
enum HYD_pmcd_pmi_proxy_cmds cmd;
HYD_status status = HYD_SUCCESS;
/*
* Here are the steps we will follow:
*
* 1. Put all the arguments to pass in to a string list.
*
* 2. Connect to the proxy (this will be our primary control
* socket).
*
* 3. Read this string list and write the following to the socket:
* (a) The PROC_INFO command.
* (b) Integer sized data with the number of arguments to
* follow.
* (c) For each argument to pass, first send an integer which
* tells the proxy how many bytes are coming in that
* argument.
*
* 4. Open two new sockets and connect them to the proxy.
*
* 5. On the first new socket, send USE_AS_STDOUT and the second
* send USE_AS_STDERR.
*
* 6. For the first process, open a separate socket and send the
* USE_AS_STDIN command on it.
*
* 7. We need to figure out what to do with the LAUNCH_JOB
* command; since it's going on a different socket, it might go
* out-of-order. Maybe a state machine on the proxy to see if
* it got all the information it needs to launch the job would
* work.
*/
status = HYDU_sock_connect(proxy->node.hostname, HYD_handle.proxy_port, &proxy->control_fd);
HYDU_ERR_POP(status, "unable to connect to proxy\n");
status = HYD_pmcd_pmi_send_exec_info(proxy);
HYDU_ERR_POP(status, "error sending executable info\n");
/* Create an stdout socket */
status = HYDU_sock_connect(proxy->node.hostname, HYD_handle.proxy_port, &proxy->out);
HYDU_ERR_POP(status, "unable to connect to proxy\n");
cmd = USE_AS_STDOUT;
status = HYDU_sock_write(proxy->out, &cmd, sizeof(enum HYD_pmcd_pmi_proxy_cmds));
HYDU_ERR_POP(status, "unable to write data to proxy\n");
/* Create an stderr socket */
status = HYDU_sock_connect(proxy->node.hostname, HYD_handle.proxy_port, &proxy->err);
HYDU_ERR_POP(status, "unable to connect to proxy\n");
cmd = USE_AS_STDERR;
status = HYDU_sock_write(proxy->err, &cmd, sizeof(enum HYD_pmcd_pmi_proxy_cmds));
HYDU_ERR_POP(status, "unable to write data to proxy\n");
/* If rank 0 is here, create an stdin socket */
if (proxy->proxy_id == 0) {
status = HYDU_sock_connect(proxy->node.hostname, HYD_handle.proxy_port, &proxy->in);
HYDU_ERR_POP(status, "unable to connect to proxy\n");
cmd = USE_AS_STDIN;
status = HYDU_sock_write(proxy->in, &cmd, sizeof(enum HYD_pmcd_pmi_proxy_cmds));
HYDU_ERR_POP(status, "unable to write data to proxy\n");
}
fn_exit:
return NULL;
fn_fail:
goto fn_exit;
}
static HYD_status
create_and_listen_portstr(HYD_status(*callback) (int fd, HYD_event_t events, void *userp),
char **port_str)
......@@ -179,6 +103,7 @@ static HYD_status fill_in_proxy_args(char *mode, char **proxy_args)
proxy_args[arg++] = HYDU_strdup(HYD_handle.user_global.bootstrap_exec);
}
proxy_args[arg++] = HYDU_strdup("--proxy-id");
proxy_args[arg++] = NULL;
fn_exit:
......@@ -358,6 +283,7 @@ static HYD_status fill_in_exec_launch_info(void)
HYD_status HYD_pmci_launch_procs(void)
{
struct HYD_proxy *proxy;
struct HYD_node *node_list, *node, *tnode;
enum HYD_pmcd_pmi_proxy_cmds cmd;
int fd;
char *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
......@@ -377,12 +303,30 @@ HYD_status HYD_pmci_launch_procs(void)
status = HYD_pmcd_pmi_init();
HYDU_ERR_POP(status, "unable to create process group\n");
/* Copy the host list to pass to the bootstrap server */
node_list = NULL;
for (proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next) {
HYDU_alloc_node(&node);
node->hostname = HYDU_strdup(proxy->node.hostname);
node->core_count = proxy->node.core_count;
node->next = NULL;
if (node_list == NULL) {
node_list = node;
}
else {
for (tnode = node_list; tnode->next; tnode = tnode->next);
tnode->next = node;
}
}
if (!strcmp(HYD_handle.user_global.launch_mode, "boot") ||
!strcmp(HYD_handle.user_global.launch_mode, "boot-debug")) {
status = fill_in_proxy_args(HYD_handle.user_global.launch_mode, proxy_args);
HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
status = HYDT_bsci_launch_procs(proxy_args, "--proxy-id", HYD_handle.pg_list.proxy_list);
status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, HYD_handle.stdin_cb,
HYD_handle.stdout_cb, HYD_handle.stderr_cb);
HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
}
else if (!strcmp(HYD_handle.user_global.launch_mode, "shutdown")) {
......@@ -414,7 +358,8 @@ HYD_status HYD_pmci_launch_procs(void)
status = fill_in_exec_launch_info();
HYDU_ERR_POP(status, "unable to fill in executable arguments\n");
status = HYDT_bsci_launch_procs(proxy_args, "--proxy-id", HYD_handle.pg_list.proxy_list);
status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, HYD_handle.stdin_cb,
HYD_handle.stdout_cb, HYD_handle.stderr_cb);
HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
}
......@@ -424,6 +369,7 @@ HYD_status HYD_pmci_launch_procs(void)
if (proxy_port_str)
HYDU_FREE(proxy_port_str);
HYDU_free_strlist(proxy_args);
HYDU_free_node_list(node_list);
HYDU_FUNC_EXIT();
return status;
......@@ -434,67 +380,15 @@ HYD_status HYD_pmci_launch_procs(void)
HYD_status HYD_pmci_wait_for_completion(void)
{
struct HYD_proxy *proxy;
int sockets_open, all_procs_exited, infinite = -1;
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
if (strcmp(HYD_handle.user_global.launch_mode, "boot") &&
strcmp(HYD_handle.user_global.launch_mode, "shutdown")) {
while (1) {
/* Wait for some event to occur */
status =
HYDT_dmx_wait_for_event(HYDU_time_left(HYD_handle.start, HYD_handle.timeout));
HYDU_ERR_POP(status, "error waiting for event\n");
/* timeout expired */
if (HYDU_time_left(HYD_handle.start, HYD_handle.timeout) == 0) {
status = HYD_pmcd_pmi_serv_cleanup();
HYDU_ERR_POP(status, "cleanup of processes failed\n");
/* Reset timer to infinite */
HYDU_time_set(&HYD_handle.timeout, &infinite);
continue;
}
/* Check to see if there's any open read socket left; if
* there are, we will just wait for more events. */
sockets_open = 0;
for (proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next) {
if (proxy->out != -1 || proxy->err != -1) {
sockets_open++;
break;
}
}
if (sockets_open && HYDU_time_left(HYD_handle.start, HYD_handle.timeout))
continue;
break;
}
do {
/* Check if the exit status has already arrived */
all_procs_exited = 1;
for (proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next) {
if (proxy->exit_status == NULL) {
all_procs_exited = 0;
break;
}
}
if (all_procs_exited)
break;
/* If not, wait for some event to occur */
status =
HYDT_dmx_wait_for_event(HYDU_time_left(HYD_handle.start, HYD_handle.timeout));
HYDU_ERR_POP(status, "error waiting for event\n");
} while (1);
status = HYDT_bsci_wait_for_completion(HYD_handle.timeout);
if (status == HYD_TIMED_OUT) {
status = HYD_pmcd_pmi_serv_cleanup();
HYDU_ERR_POP(status, "cleanup of processes failed\n");
}
status = HYDT_bsci_wait_for_completion(HYD_handle.pg_list.proxy_list);
HYDU_ERR_POP(status, "bootstrap server returned error waiting for completion\n");
fn_exit:
......
......@@ -21,7 +21,7 @@
*/
struct HYD_rmki_fns {
/** \brief Query node list information */
HYD_status(*query_node_list) (int *num_cores, struct HYD_proxy **proxy_list);
HYD_status(*query_node_list) (struct HYD_node **node_list);
};
/** \cond */
......@@ -40,11 +40,9 @@ HYD_status HYD_rmki_init(char *rmk);
/**
* \brief HYD_rmki_query_node_list - Query node list information
*
* \param[in,out] num_cores Number of cores available (non-zero cores means
* that the RMK should allocate the nodes if needed
* \param[out] proxy_list List of proxy structures containing the list of hosts
* \param[out] node_list List of nodes available
*/
HYD_status HYD_rmki_query_node_list(int *num_cores, struct HYD_proxy **proxy_list);
HYD_status HYD_rmki_query_node_list(struct HYD_node **node_list);
/*!
* @}
......
......@@ -9,6 +9,6 @@
#include "hydra_base.h"
HYD_status HYD_rmkd_pbs_query_node_list(int *num_cores, struct HYD_proxy **proxy_list);
HYD_status HYD_rmkd_pbs_query_node_list(struct HYD_node **node_list);
#endif /* RMK_PBS_H_INCLUDED */
......@@ -9,6 +9,7 @@
#include "rmk_pbs.h"
static int total_num_procs;
static struct HYD_node *global_node_list = NULL;
static HYD_status process_mfile_token(char *token, int newline)
{
......@@ -21,7 +22,7 @@ static HYD_status process_mfile_token(char *token, int newline)
procs = strtok(NULL, ":");
num_procs = procs ? atoi(procs) : 1;
status = HYDU_add_to_node_list(hostname, num_procs, &HYD_handle.node_list);
status = HYDU_add_to_node_list(hostname, num_procs, &global_node_list);
HYDU_ERR_POP(status, "unable to initialize proxy\n");
total_num_procs += num_procs;
......@@ -38,7 +39,7 @@ static HYD_status process_mfile_token(char *token, int newline)
goto fn_exit;
}
HYD_status HYD_rmkd_pbs_query_node_list(int *num_cores, struct HYD_proxy **proxy_list)
HYD_status HYD_rmkd_pbs_query_node_list(struct HYD_node **node_list)
{
char *hostfile;
HYD_status status = HYD_SUCCESS;
......@@ -47,14 +48,14 @@ HYD_status HYD_rmkd_pbs_query_node_list(int *num_cores, struct HYD_proxy **proxy
hostfile = getenv("PBS_NODEFILE");
if (hostfile == NULL) {
*proxy_list = NULL;
*node_list = NULL;
}
else {
total_num_procs = 0;
status = HYDU_parse_hostfile(hostfile, process_mfile_token);
HYDU_ERR_POP(status, "error parsing hostfile\n");
*num_cores = total_num_procs;
}
*node_list = global_node_list;
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -7,13 +7,13 @@
#include "hydra_base.h"
#include "rmki.h"
HYD_status HYD_rmki_query_node_list(int *num_cores, struct HYD_proxy **proxy_list)
HYD_status HYD_rmki_query_node_list(struct HYD_node **node_list)
{
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
status = HYD_rmki_fns.query_node_list(num_cores, proxy_list);
status = HYD_rmki_fns.query_node_list(node_list);
HYDU_ERR_POP(status, "RMK device returned error while querying node list\n");
fn_exit:
......
......@@ -9,6 +9,6 @@
#include "hydra_base.h"
HYD_status HYD_rmku_query_node_list(int *num_cores, struct HYD_proxy **proxy_list);
HYD_status HYD_rmku_query_node_list(struct HYD_node **node_list);
#endif /* RMKU_H_INCLUDED */
......@@ -9,7 +9,7 @@
#include "bsci.h"
#include "rmku.h"
HYD_status HYD_rmku_query_node_list(int *num_cores, struct HYD_proxy **proxy_list)
HYD_status HYD_rmku_query_node_list(struct HYD_node **node_list)
{
HYD_status status = HYD_SUCCESS;
......@@ -17,7 +17,7 @@ HYD_status HYD_rmku_query_node_list(int *num_cores, struct HYD_proxy **proxy_lis
/* We just query the bootstrap server for the node list and return
* it to the upper layer. */
status = HYDT_bsci_query_node_list(num_cores, proxy_list);
status = HYDT_bsci_query_node_list(node_list);
HYDU_ERR_POP(status, "bootstrap device returned error while querying node list\n");
fn_exit:
......
......@@ -9,7 +9,10 @@
#include "hydra_base.h"
HYD_status HYDT_bscd_fork_launch_procs(char **global_args, const char *proxy_id_str,
struct HYD_proxy *proxy_list);
HYD_status HYDT_bscd_fork_launch_procs(
char **args, struct HYD_node *node_list, void *userp,
HYD_status(*stdin_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stdout_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stderr_cb) (int fd, HYD_event_t events, void *userp));
#endif /* FORK_H_INCLUDED */
......@@ -9,49 +9,76 @@
#include "bscu.h"
#include "fork.h"
HYD_status HYDT_bscd_fork_launch_procs(char **global_args, const char *proxy_id_str,
struct HYD_proxy *proxy_list)
HYD_status HYDT_bscd_fork_launch_procs(
char **args, struct HYD_node *node_list, void *userp,
HYD_status(*stdin_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stdout_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stderr_cb) (int fd, HYD_event_t events, void *userp))
{
struct HYD_proxy *proxy;
char *client_arg[HYD_NUM_TMP_STRINGS];
int i, arg, process_id;
int num_hosts, idx, i, fd_stdin, fd_stdout, fd_stderr;
int *pid, *fd_list;
struct HYD_node *node;
char *targs[HYD_NUM_TMP_STRINGS];
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
process_id = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next) {
/* Setup the executable arguments */
arg = 0;
idx = 0;
for (i = 0; args[i]; i++)
targs[idx++] = HYDU_strdup(args[i]);
for (i = 0; global_args[i]; i++)
client_arg[arg++] = HYDU_strdup(global_args[i]);
/* pid_list might already have some PIDs */
num_hosts = 0;
for (node = node_list; node; node = node->next)
num_hosts++;
if (proxy_id_str) {
client_arg[arg++] = HYDU_strdup(proxy_id_str);
client_arg[arg++] = HYDU_int_to_str(proxy->proxy_id);
}
/* Increase pid list to accommodate these new pids */
HYDU_MALLOC(pid, int *, (HYD_bscu_pid_count + num_hosts) * sizeof(int), status);
for (i = 0; i < HYD_bscu_pid_count; i++)
pid[i] = HYD_bscu_pid_list[i];
HYDU_FREE(HYD_bscu_pid_list);
HYD_bscu_pid_list = pid;
client_arg[arg++] = NULL;
/* Increase fd list to accommodate these new fds */
HYDU_MALLOC(fd_list, int *, (HYD_bscu_fd_count + (2 * num_hosts) + 1) * sizeof(int),
status);
for (i = 0; i < HYD_bscu_fd_count; i++)
fd_list[i] = HYD_bscu_fd_list[i];
HYDU_FREE(HYD_bscu_fd_list);
HYD_bscu_fd_list = fd_list;
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout, "Launching process: ");
HYDU_print_strlist(client_arg);
}
for (i = 0, node = node_list; node; node = node->next, i++) {
/* append proxy ID */
targs[idx] = HYDU_int_to_str(i);
targs[idx + 1] = NULL;
/* The stdin pointer will be some value for process_id 0; for
* everyone else, it's NULL. */
status = HYDU_create_process(client_arg, NULL,
(process_id == 0 ? &proxy->in : NULL),
&proxy->out, &proxy->err, &proxy->pid, -1);
status = HYDU_create_process(targs, NULL, (i == 0 ? &fd_stdin : NULL),
&fd_stdout, &fd_stderr,
&HYD_bscu_pid_list[HYD_bscu_pid_count++], -1);
HYDU_ERR_POP(status, "create process returned error\n");
HYDU_free_strlist(client_arg);
if (i == 0)
HYD_bscu_fd_list[HYD_bscu_fd_count++] = fd_stdin;
HYD_bscu_fd_list[HYD_bscu_fd_count++] = fd_stdout;
HYD_bscu_fd_list[HYD_bscu_fd_count++] = fd_stderr;
/* Register stdio callbacks for the spawned process */
if (i == 0) {
status = HYDT_dmx_register_fd(1, &fd_stdin, HYD_STDIN, userp, stdin_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
status = HYDT_dmx_register_fd(1, &fd_stdout, HYD_STDOUT, userp, stdout_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
process_id++;
status = HYDT_dmx_register_fd(1, &fd_stderr, HYD_STDOUT, userp, stderr_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
fn_exit:
HYDU_free_strlist(targs);
HYDU_FUNC_EXIT();
return status;
......
......@@ -33,17 +33,20 @@ struct HYDT_bsci_info {
*/
struct HYDT_bsci_fns {
/** \brief Launch processes */
HYD_status(*launch_procs) (char **global_args, const char *proxy_id_str,
struct HYD_proxy *proxy_list);
HYD_status(*launch_procs) (
char **args, struct HYD_node *node_list, void *userp,
HYD_status(*stdin_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stdout_cb) (int fd, HYD_event_t events, void *userp),
HYD_status(*stderr_cb) (int fd, HYD_event_t events, void *userp));
/** \brief Finalize the bootstrap control device */
HYD_status(*finalize) (void);
/** \brief Wait for bootstrap launched processes to complete */
HYD_status(*wait_for_completion) (struct HYD_proxy *proxy_list);
HYD_status(*wait_for_completion) (int timeout);
/** \brief Query for node list information */
HYD_status(*query_node_list) (int *num_cores, struct HYD_proxy **proxy_list);
HYD_status(*query_node_list) (struct HYD_node **node_list);
/** \brief Query for the universe size */
HYD_status(*query_usize) (int *size);
......@@ -76,21 +79,30 @@ HYD_status HYDT_bsci_init(char *bootstrap, char *bootstrap_exec, int enablex, in
/**
* \brief HYDT_bsci_launch_procs - Launch processes
*
* \param[in] global_args Arguments to be used for the launched processes
* \param[in] proxy_id_str String to prepend in the arguments for the
* proxy ID (use -1 if the proxy should query for the ID)
* \param[in] proxy_list List of proxy structures containing the list of hosts
* \param[in] args Arguments to be used for the launched processes
* \param[in] node_list List of nodes to launch processes on
* \param[in] userp Arbitrary user pointer that is passed to the callback functions
* \param[in] stdin_cb Stdin callback function
* \param[in] stdout_cb Stdout callback function
* \param[in] stderr_cb Stderr callback function
*
* This function uses the hosts information in the proxy_list to
* launch processes. Bootstrap servers that perform sequential
* launches (one process at a time), should set the proxy ID string in