Commit 517115b4 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r4156] Added support for allowing each proxy to handle non-contiguous

PMI_IDs. This lets us use a single proxy for multiple executables when
they are launched on the same node. This should fix ticket #464 and is
a big step towards tickets #445 (hierarchical proxies part) and #457
(process-core mapping).
parent d640ab0d
......@@ -13,8 +13,7 @@ HYD_Handle handle;
HYD_Status HYD_BSCD_fork_launch_procs(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
char *client_arg[HYD_EXEC_ARGS];
int i, arg, process_id;
HYD_Status status = HYD_SUCCESS;
......@@ -26,33 +25,28 @@ HYD_Status HYD_BSCD_fork_launch_procs(void)
* they want launched. Without this functionality, the proxy
* cannot use this and will have to perfom its own launch. */
process_id = 0;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->group_rank) /* Only rank 0 is spawned */
continue;
for (partition = handle.partition_list; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
for (i = 0; partition->args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->args[i]);
client_arg[arg++] = NULL;
/* Setup the executable arguments */
arg = 0;
for (i = 0; partition->proxy_args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->proxy_args[i]);
client_arg[arg++] = NULL;
/* The stdin pointer will be some value for process_id 0;
* for everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid,
-1);
HYDU_ERR_POP(status, "create process returned error\n");
/* The stdin pointer will be some value for process_id 0; for
* everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid, -1);
HYDU_ERR_POP(status, "create process returned error\n");
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
process_id++;
}
process_id++;
}
fn_exit:
......
......@@ -13,8 +13,7 @@ HYD_Handle handle;
HYD_Status HYD_BSCD_slurm_launch_procs(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
char *client_arg[HYD_EXEC_ARGS];
int i, arg, process_id;
HYD_Status status = HYD_SUCCESS;
......@@ -26,52 +25,39 @@ HYD_Status HYD_BSCD_slurm_launch_procs(void)
* they want launched. Without this functionality, the proxy
* cannot use this and will have to perfom its own launch. */
process_id = 0;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->group_rank) /* Only rank 0 is spawned */
continue;
for (partition = handle.partition_list; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
/* FIXME: Get the path to srun */
client_arg[arg++] = MPIU_Strdup("srun");
/* Setup the executable arguments */
arg = 0;
/* FIXME: Get the path to srun */
client_arg[arg++] = MPIU_Strdup("srun");
/* Allow X forwarding only if explicitly requested */
if (handle.enablex == 1)
client_arg[arg++] = MPIU_Strdup("-X");
else if (handle.enablex == 0)
client_arg[arg++] = MPIU_Strdup("-x");
else /* default mode is disable X */
client_arg[arg++] = MPIU_Strdup("-x");
/* Currently, we do not support any partition names other than
* host names */
client_arg[arg++] = MPIU_Strdup(partition->name);
/* Currently, we do not support any partition names other
* than host names */
client_arg[arg++] = MPIU_Strdup(partition->name);
for (i = 0; partition->proxy_args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->proxy_args[i]);
for (i = 0; partition->args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->args[i]);
client_arg[arg++] = NULL;
client_arg[arg++] = NULL;
/* The stdin pointer will be some value for process_id 0;
* for everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid,
-1);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
/* The stdin pointer will be some value for process_id 0; for
* everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid, -1);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
process_id++;
}
process_id++;
}
fn_exit:
......
......@@ -19,8 +19,7 @@ HYD_Handle handle;
*/
HYD_Status HYD_BSCD_ssh_launch_procs(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
char *client_arg[HYD_EXEC_ARGS];
int i, arg, process_id;
HYD_Status status = HYD_SUCCESS;
......@@ -32,47 +31,42 @@ HYD_Status HYD_BSCD_ssh_launch_procs(void)
* they want launched. Without this functionality, the proxy
* cannot use this and will have to perfom its own launch. */
process_id = 0;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->group_rank) /* Only rank 0 is spawned */
continue;
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
/* Allow X forwarding only if explicitly requested */
if (handle.enablex == 1)
client_arg[arg++] = MPIU_Strdup("-X");
else if (handle.enablex == 0)
client_arg[arg++] = MPIU_Strdup("-x");
else /* default mode is disable X */
client_arg[arg++] = MPIU_Strdup("-x");
/* ssh does not support any partition names other than host names */
client_arg[arg++] = MPIU_Strdup(partition->name);
for (i = 0; partition->args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->args[i]);
client_arg[arg++] = NULL;
/* The stdin pointer will be some value for process_id 0;
* for everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid,
-1);
HYDU_ERR_POP(status, "create process returned error\n");
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
process_id++;
}
for (partition = handle.partition_list; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
/* Allow X forwarding only if explicitly requested */
if (handle.enablex == 1)
client_arg[arg++] = MPIU_Strdup("-X");
else if (handle.enablex == 0)
client_arg[arg++] = MPIU_Strdup("-x");
else /* default mode is disable X */
client_arg[arg++] = MPIU_Strdup("-x");
/* ssh does not support any partition names other than host names */
client_arg[arg++] = MPIU_Strdup(partition->name);
for (i = 0; partition->proxy_args[i]; i++)
client_arg[arg++] = MPIU_Strdup(partition->proxy_args[i]);
client_arg[arg++] = NULL;
/* The stdin pointer will be some value for process_id 0; for
* everyone else, it's NULL. */
status = HYDU_create_process(client_arg, (process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err, &partition->pid, -1);
HYDU_ERR_POP(status, "create process returned error\n");
for (arg = 0; client_arg[arg]; arg++)
HYDU_FREE(client_arg[arg]);
/* For the remaining processes, set the stdin fd to -1 */
if (process_id != 0)
handle.in = -1;
process_id++;
}
fn_exit:
......
......@@ -19,17 +19,15 @@ HYD_Handle handle;
HYD_Status HYD_BSCU_wait_for_completion(void)
{
int pid, ret_status, not_completed;
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
not_completed = 0;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next)
for (partition = proc_params->partition; partition; partition = partition->next)
if (partition->exit_status == -1)
not_completed++;
for (partition = handle.partition_list; partition; partition = partition->next)
if (partition->exit_status == -1)
not_completed++;
/* We get here only after the I/O sockets have been closed. If the
* application did not manually close its stdout and stderr
......@@ -40,14 +38,10 @@ HYD_Status HYD_BSCU_wait_for_completion(void)
pid = waitpid(-1, &ret_status, WNOHANG);
if (pid > 0) {
/* Find the pid and mark it as complete. */
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
if (partition->pid == pid) {
partition->exit_status = WEXITSTATUS(ret_status);
not_completed--;
}
for (partition = handle.partition_list; partition; partition = partition->next) {
if (partition->pid == pid) {
partition->exit_status = WEXITSTATUS(ret_status);
not_completed--;
}
}
}
......
......@@ -14,8 +14,7 @@ HYD_Handle handle;
HYD_Status HYD_CSI_close_fd(int fd)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
......@@ -26,16 +25,14 @@ HYD_Status HYD_CSI_close_fd(int fd)
close(fd);
/* Find the FD in the handle and remove it. */
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out == fd) {
partition->out = -1;
goto fn_exit;
}
if (partition->err == fd) {
partition->err = -1;
goto fn_exit;
}
for (partition = handle.partition_list; partition; partition = partition->next) {
if (partition->out == fd) {
partition->out = -1;
goto fn_exit;
}
if (partition->err == fd) {
partition->err = -1;
goto fn_exit;
}
}
......
......@@ -14,8 +14,7 @@ HYD_Handle handle;
HYD_Status HYD_CSI_launch_procs(void)
{
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
int stdin_fd;
HYD_Status status = HYD_SUCCESS;
......@@ -24,16 +23,12 @@ HYD_Status HYD_CSI_launch_procs(void)
status = HYD_PMCI_launch_procs();
HYDU_ERR_POP(status, "PM returned error while launching processes\n");
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
status =
HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, proc_params->stdout_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
for (partition = handle.partition_list; partition; partition = partition->next) {
status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, handle.stdout_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
status =
HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, proc_params->stderr_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, handle.stderr_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
if (handle.in != -1) { /* Only process_id 0 */
......
......@@ -15,8 +15,7 @@ HYD_Handle handle;
HYD_Status HYD_CSI_wait_for_completion(void)
{
int sockets_open;
struct HYD_Proc_params *proc_params;
struct HYD_Partition_list *partition;
struct HYD_Partition *partition;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
......@@ -29,15 +28,11 @@ HYD_Status HYD_CSI_wait_for_completion(void)
/* Check to see if there's any open read socket left; if there
* are, we will just wait for more events. */
sockets_open = 0;
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out != -1 || partition->err != -1) {
sockets_open++;
break;
}
}
if (sockets_open)
for (partition = handle.partition_list; partition; partition = partition->next) {
if (partition->out != -1 || partition->err != -1) {
sockets_open++;
break;
}
}
if (sockets_open && HYDU_time_left(handle.start, handle.timeout))
......
......@@ -32,8 +32,7 @@ HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
for (i = 0; i < num_fds; i++)
if (fd[i] < 0)
HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "registering bad fd %d\n",
fd[i]);
HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "registering bad fd %d\n", fd[i]);
HYDU_MALLOC(cb_element, HYD_DMXI_callback_t *, sizeof(HYD_DMXI_callback_t), status);
cb_element->num_fds = num_fds;
......
......@@ -30,6 +30,8 @@ struct HYD_Handle_ {
int in;
HYD_Status(*stdin_cb) (int fd, HYD_Event_t events);
HYD_Status(*stdout_cb) (int fd, HYD_Event_t events);
HYD_Status(*stderr_cb) (int fd, HYD_Event_t events);
/* Start time and timeout. These are filled in by the launcher,
* but are utilized by the demux engine and the boot-strap server
......@@ -37,26 +39,10 @@ struct HYD_Handle_ {
HYD_Time start;
HYD_Time timeout;
/* Each structure will contain all hosts/cores that use the same
* executable and environment. */
struct HYD_Proc_params {
int exec_proc_count;
char *exec[HYD_EXEC_ARGS];
int one_pass_count;
struct HYD_Partition_list *partition;
/* Local environment */
HYD_Env_t *user_env;
HYD_Env_prop_t prop;
HYD_Env_t *prop_env;
/* Callback functions for the stdout/stderr events. These can
* be the same. */
HYD_Status(*stdout_cb) (int fd, HYD_Event_t events);
HYD_Status(*stderr_cb) (int fd, HYD_Event_t events);
struct HYD_Proc_params *next;
} *proc_params;
struct HYD_Exec_info *exec_info_list;
struct HYD_Partition *partition_list;
/* Random parameters used for internal code */
int func_depth;
......@@ -67,20 +53,6 @@ struct HYD_Handle_ {
typedef struct HYD_Handle_ HYD_Handle;
/* We'll use this as the central handle that has most of the
* information needed by everyone. All data to be written has to be
* done before the HYD_CSI_wait_for_completion() function is called,
* except for two exceptions:
*
* 1. The timeout value is initially added by the launcher before the
* HYD_CSI_wait_for_completion() function is called, but can be edited
* by the control system within this call. There's no guarantee on
* what value it will contain for the other layers.
*
* 2. There is no guarantee on what the exit status will contain till
* the HYD_CSI_wait_for_completion() function returns (where the
* bootstrap server can fill out these values).
*/
extern HYD_Handle handle;
#endif /* HYDRA_H_INCLUDED */
......@@ -58,6 +58,16 @@ extern char **environ;
#define HYD_DEFAULT_PROXY_PORT 9899
#define HYD_STDOUT (1)
#define HYD_STDIN (2)
typedef unsigned short HYD_Event_t;
#define HYD_TMPBUF_SIZE (64 * 1024)
#define HYD_EXEC_ARGS 200
/* Status information */
typedef enum {
HYD_SUCCESS = 0,
HYD_GRACEFUL_ABORT,
......@@ -67,6 +77,74 @@ typedef enum {
HYD_INTERNAL_ERROR
} HYD_Status;
/* Environment information */
typedef struct HYD_Env {
char *env_name;
char *env_value;
struct HYD_Env *next;
} HYD_Env_t;
typedef enum {
HYD_ENV_PROP_UNSET,
HYD_ENV_PROP_ALL,
HYD_ENV_PROP_NONE,
HYD_ENV_PROP_LIST
} HYD_Env_prop_t;
/* List of contiguous segments of processes on a partition */
struct HYD_Partition_segment {
int start_pid;
int proc_count;
char **mapping;
struct HYD_Partition_segment *next;
};
/* Executables on a partition */
struct HYD_Partition_exec {
char *exec[HYD_EXEC_ARGS];
int proc_count;
HYD_Env_prop_t prop;
HYD_Env_t *prop_env;
struct HYD_Partition_exec *next;
};
/* Partition information */
struct HYD_Partition {
char *name;
int total_proc_count;
/* Segment list will contain one-pass of the hosts file */
struct HYD_Partition_segment *segment_list;
struct HYD_Partition_exec *exec_list;
/* Spawn information: each partition can have one or more
* proxies. For the time being, we only support one proxy per
* partition, but this can be easily extended later. We will also
* need to give different ports for the proxies to listen on in
* that case. */
int pid;
int out;
int err;
int exit_status;
char *proxy_args[HYD_EXEC_ARGS]; /* Full argument list */
struct HYD_Partition *next;
};
struct HYD_Exec_info {
int exec_proc_count;
char *exec[HYD_EXEC_ARGS];
/* Local environment */
HYD_Env_t *user_env;
HYD_Env_prop_t prop;
HYD_Env_t *prop_env;
struct HYD_Exec_info *next;
} *exec_info;
#define HYDU_ERR_POP(status, message) \
{ \
if (status != HYD_SUCCESS && status != HYD_GRACEFUL_ABORT) { \
......@@ -124,26 +202,6 @@ typedef enum {
} \
}
#define HYD_STDOUT (1)
#define HYD_STDIN (2)
typedef unsigned short HYD_Event_t;
#define HYD_TMPBUF_SIZE (64 * 1024)
#define HYD_EXEC_ARGS 200
typedef struct HYD_Env {
char *env_name;
char *env_value;
struct HYD_Env *next;
} HYD_Env_t;
typedef enum {
HYD_ENV_PROP_UNSET,
HYD_ENV_PROP_ALL,
HYD_ENV_PROP_NONE,
HYD_ENV_PROP_LIST
} HYD_Env_prop_t;
#if defined ENABLE_WARNINGS
#define HYDU_Warn_printf HYDU_Error_printf
......
......@@ -24,6 +24,7 @@ HYD_Status HYDU_bind_process(int core);
/* env */
HYD_Env_t *HYDU_str_to_env(char *str);
HYD_Status HYDU_list_append_env_to_str(HYD_Env_t * env_list, char **str_list);
HYD_Status HYDU_list_global_env(HYD_Env_t ** env_list);
HYD_Env_t *HYDU_env_list_dup(HYD_Env_t * env);
......@@ -33,28 +34,19 @@ HYD_Status HYDU_env_free_list(HYD_Env_t * env);
HYD_Env_t *HYDU_env_lookup(HYD_Env_t env, HYD_Env_t * env_list);
HYD_Status HYDU_append_env_to_list(HYD_Env_t env, HYD_Env_t ** env_list);
void HYDU_putenv(char *env_str);
HYD_Status HYDU_comma_list_to_env_list(char *str, HYD_Env_t **env_list);
HYD_Status HYDU_comma_list_to_env_list(char *str, HYD_Env_t ** env_list);
/* launch */
struct HYD_Partition_list {
char *name;
int proc_count;
char **mapping; /* Can be core IDs or something else */
int group_id; /* Assumed to be in ascending order */
int group_rank; /* Rank within the group */
int pid;
int out;
int err;
int exit_status;
char *args[HYD_EXEC_ARGS];
struct HYD_Partition_list *next;