Commit e39297d8 authored by Jayesh Krishna's avatar Jayesh Krishna
Browse files

[svn-r4213] # Initial cut of distributed proxies support in hydra

  - launch/shutdown proxies using job launcher
  - launch jobs using the standalone proxy
# DMX engine can now handle user contexts. The user registers the context when registering the fd & the DMX engine provides the context in the callback.
# Limitations (will be fixed soon...)
  - Code is a bit hackish... FIXMEs should cover a lot of them
  - Works only on localhost - debugging multiple hosts
  - Does not support MPMD
  - Supports only one job at a time
  - Need to provide complete path to executablea - to be fixed soon
parent d5bbafd3
......@@ -9,7 +9,7 @@
#include "bscu.h"
#include "fork.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_BSCD_fork_launch_procs(void)
{
......
......@@ -9,7 +9,7 @@
#include "bscu.h"
#include "slurm.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_BSCD_slurm_launch_procs(void)
{
......
......@@ -9,7 +9,7 @@
#include "bscu.h"
#include "ssh.h"
HYD_Handle handle;
extern HYD_Handle handle;
/*
* HYD_BSCD_ssh_launch_procs: For each process, we create an
......
......@@ -8,7 +8,7 @@
#include "hydra_utils.h"
#include "bscu.h"
HYD_Handle handle;
extern HYD_Handle handle;
/*
* HYD_BSCU_wait_for_completion: We first wait for communication
......
......@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_CSI_close_fd(int fd)
{
......
......@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_CSI_launch_procs(void)
{
......@@ -24,11 +24,13 @@ HYD_Status HYD_CSI_launch_procs(void)
HYDU_ERR_POP(status, "PM returned error while launching processes\n");
for (partition = handle.partition_list; partition; partition = partition->next) {
status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, handle.stdout_cb);
status = HYD_DMX_register_fd(1, &partition->out, HYD_STDOUT, NULL, handle.stdout_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, handle.stderr_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
if(partition->err != -1) {
status = HYD_DMX_register_fd(1, &partition->err, HYD_STDOUT, NULL, handle.stderr_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
}
if (handle.in != -1) { /* Only process_id 0 */
......@@ -42,7 +44,7 @@ HYD_Status HYD_CSI_launch_procs(void)
handle.stdin_buf_count = 0;
handle.stdin_buf_offset = 0;
status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, handle.stdin_cb);
status = HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, handle.stdin_cb);
HYDU_ERR_POP(status, "demux returned error registering fd\n");
}
......
......@@ -10,7 +10,7 @@
#include "pmci.h"
#include "demux.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_CSI_wait_for_completion(void)
{
......
......@@ -14,15 +14,16 @@ typedef struct HYD_DMXI_callback {
int num_fds;
int *fd;
HYD_Event_t events;
HYD_Status(*callback) (int fd, HYD_Event_t events);
void *userp;
HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp);
struct HYD_DMXI_callback *next;
} HYD_DMXI_callback_t;
static HYD_DMXI_callback_t *cb_list = NULL;
HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
HYD_Status(*callback) (int fd, HYD_Event_t events))
HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp))
{
HYD_DMXI_callback_t *cb_element, *run;
int i;
......@@ -39,6 +40,7 @@ HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
memcpy(cb_element->fd, fd, num_fds * sizeof(int));
cb_element->events = events;
cb_element->userp = userp;
cb_element->callback = callback;
cb_element->next = NULL;
......@@ -158,7 +160,7 @@ HYD_Status HYD_DMX_wait_for_event(int time)
if (pollfds[i].revents & POLLIN)
events |= HYD_STDOUT;
status = run->callback(pollfds[i].fd, events);
status = run->callback(pollfds[i].fd, events, run->userp);
HYDU_ERR_POP(status, "callback returned error status\n");
}
......
......@@ -9,8 +9,8 @@
#include "hydra.h"
HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events,
HYD_Status(*callback) (int fd, HYD_Event_t events));
HYD_Status HYD_DMX_register_fd(int num_fds, int *fd, HYD_Event_t events, void *userp,
HYD_Status(*callback) (int fd, HYD_Event_t events, void *userp));
HYD_Status HYD_DMX_deregister_fd(int fd);
HYD_Status HYD_DMX_wait_for_event(int time);
HYD_Status HYD_DMX_finalize(void);
......
......@@ -14,7 +14,16 @@
struct HYD_Handle_ {
char *base_path;
int proxy_port;
/* The persistent proxy is different from the centralized proxy
* and hence needs its own port - pproxy_port */
int pproxy_port;
char *bootstrap;
/* FIXME: We should define a proxy type instead of all these
* flags... proxy_type = PROXY_LAUNCHER | PROXY_TERMINATOR
*/
int is_proxy_launcher;
int is_proxy_terminator;
int is_proxy_remote;
HYD_Binding binding;
char *user_bind_map;
......@@ -31,9 +40,9 @@ struct HYD_Handle_ {
HYD_Env_t *prop_env;
int in;
HYD_Status(*stdin_cb) (int fd, HYD_Event_t events);
HYD_Status(*stdout_cb) (int fd, HYD_Event_t events);
HYD_Status(*stderr_cb) (int fd, HYD_Event_t events);
HYD_Status(*stdin_cb) (int fd, HYD_Event_t events, void *userp);
HYD_Status(*stdout_cb) (int fd, HYD_Event_t events, void *userp);
HYD_Status(*stderr_cb) (int fd, HYD_Event_t events, void *userp);
/* Start time and timeout. These are filled in by the launcher,
* but are utilized by the demux engine and the boot-strap server
......@@ -57,4 +66,7 @@ typedef struct HYD_Handle_ HYD_Handle;
extern HYD_Handle handle;
#define HYD_PROXY_NAME "pmi_proxy"
#define HYD_PPROXY_PORT 8677
#endif /* HYDRA_H_INCLUDED */
......@@ -34,6 +34,10 @@
#include <sys/types.h>
#endif /* HAVE_SYS_TYPES_H */
#if defined HAVE_SYS_STAT_H
#include <sys/stat.h>
#endif /* HAVE_SYS_STAT_H */
#include <errno.h>
#if defined HAVE_GETTIMEOFDAY
......
......@@ -9,9 +9,9 @@
#include "mpiexec.h"
#include "csi.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
......@@ -38,8 +38,7 @@ HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
goto fn_exit;
}
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
......@@ -67,7 +66,7 @@ HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
}
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events)
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp)
{
int closed;
HYD_Status status = HYD_SUCCESS;
......
......@@ -10,7 +10,7 @@
#include "lchu.h"
#include "csi.h"
HYD_Handle handle;
extern HYD_Handle handle;
static void usage(void)
{
......
......@@ -10,8 +10,8 @@
#include "hydra.h"
HYD_Status HYD_LCHI_get_parameters(char **t_argv);
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events);
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events);
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events);
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events, void *userp);
#endif /* MPIEXEC_H_INCLUDED */
......@@ -11,7 +11,7 @@
#define HYDRA_MAX_PATH 4096
HYD_Handle handle;
extern HYD_Handle handle;
#define INCREMENT_ARGV(status) \
{ \
......@@ -33,7 +33,7 @@ HYD_Status HYD_LCHI_get_parameters(char **t_argv)
{
int i, local_env_set;
char **argv = t_argv, *tmp;
char *env_name, *env_value, *str[4] = { NULL }, *progname = *argv;
char *env_name, *env_value, *str[4] = { NULL, NULL, NULL, NULL }, *progname = *argv;
HYD_Env_t *env;
struct HYD_Exec_info *exec_info;
HYD_Status status = HYD_SUCCESS;
......@@ -68,6 +68,31 @@ HYD_Status HYD_LCHI_get_parameters(char **t_argv)
handle.enablex = !strcmp(*argv, "--enable-x");
continue;
}
if(!strcmp(*argv, "--boot-proxies")) {
/* FIXME: Prevent usage of incompatible params */
handle.bootstrap = HYDU_strdup("ssh");
handle.is_proxy_launcher = 1;
handle.prop = HYD_ENV_PROP_ALL;
continue;
}
if(!strcmp(*argv, "--remote-proxy")) {
/* FIXME: We should get rid of this option eventually.
* This should be the default case. The centralized
* version should use an option like "--local-proxy"
*/
handle.is_proxy_remote = 1;
handle.prop = HYD_ENV_PROP_ALL;
continue;
}
if(!strcmp(*argv, "--shutdown-proxies")) {
handle.is_proxy_remote = 1;
handle.is_proxy_terminator = 1;
handle.prop = HYD_ENV_PROP_ALL;
continue;
}
if (!strcmp(*argv, "-genvall")) {
HYDU_ERR_CHKANDJUMP(status, handle.prop != HYD_ENV_PROP_UNSET,
......@@ -237,12 +262,24 @@ HYD_Status HYD_LCHI_get_parameters(char **t_argv)
continue;
}
if (!strcmp(str[0], "--pproxy-port")) {
if (!str[1]) {
INCREMENT_ARGV(status);
str[1] = *argv;
}
HYDU_ERR_CHKANDJUMP(status, handle.pproxy_port != -1, HYD_INTERNAL_ERROR,
"duplicate persistent proxy port\n");
handle.pproxy_port = atoi(str[1]);
continue;
}
if (*argv[0] == '-')
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "unrecognized argument\n");
status = HYD_LCHU_get_current_exec_info(&exec_info);
HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
/* End of Job launcher option handling */
/* Read the executable till you hit the end of a ":" */
do {
if (!strcmp(*argv, ":")) { /* Next executable */
......@@ -262,8 +299,32 @@ HYD_Status HYD_LCHI_get_parameters(char **t_argv)
break;
continue;
}
/* In the case of the proxy launcher, aka --boot-proxies, there is no executable
* specified */
if(handle.is_proxy_launcher || handle.is_proxy_terminator) {
if(handle.exec_info_list != NULL)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"No executable should be specified when booting proxies\n");
status = HYD_LCHU_get_current_exec_info(&exec_info);
HYDU_ERR_POP(status, "get_current_exec_info returned error\n");
exec_info->exec[0] = HYDU_strdup(HYD_PROXY_NAME" --persistent");
exec_info->exec[1] = NULL;
exec_info->exec_proc_count = 1;
env_name = HYDU_strdup("HYD_PROXY_PORT");
env_value = HYDU_int_to_str(handle.pproxy_port);
status = HYDU_env_create(&env, env_name, env_value);
HYDU_ERR_POP(status, "unable to create env struct\n");
HYDU_append_env_to_list(*env, &exec_info->user_env);
}
tmp = getenv("MPIEXEC_DEBUG");
if (handle.debug == -1 && tmp)
handle.debug = atoi(tmp) ? 1 : 0;
......
......@@ -14,7 +14,11 @@ void HYD_LCHU_init_params(void)
{
handle.base_path = NULL;
handle.proxy_port = -1;
handle.pproxy_port = HYD_PPROXY_PORT;
handle.bootstrap = NULL;
handle.is_proxy_launcher = 0;
handle.is_proxy_terminator = 0;
handle.is_proxy_remote = 0;
handle.binding = HYD_BIND_UNSET;
handle.user_bind_map = NULL;
......
......@@ -8,7 +8,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list = NULL;
struct HYD_PMCD_pmi_handle *HYD_PMCD_pmi_v1;
......
......@@ -11,7 +11,7 @@
#include "pmi_handle.h"
#include "pmi_handle_v1.h"
HYD_Handle handle;
extern HYD_Handle handle;
HYD_PMCD_pmi_pg_t *pg_list;
/* TODO: abort, create_kvs, destroy_kvs, getbyidx, spawn */
......
......@@ -12,6 +12,76 @@
struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
int HYD_PMCD_pmi_proxy_listenfd;
static HYD_Status HYD_PMCD_pmi_pproxy_start(void ) {
/* If this function exits... its always an error */
HYD_Status status = HYD_INTERNAL_ERROR;
int ret = 0;
pid_t proc_id = -1;
struct rlimit rl;
umask(0);
/* Get the limit of fds */
ret = getrlimit(RLIMIT_NOFILE, &rl);
if(ret == -1)
HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "getrlimit() failed (%s)\n",
HYDU_strerror(errno));
proc_id = fork();
if(proc_id > 0 ) {
/* Ignore exit from child proc - persistent pmi proxy */
status = HYDU_set_signal(SIGCHLD, SIG_IGN);
HYDU_ERR_POP(status, "Setting SIGCHLD handler to SIG_IGN failed\n");
/* Parent process exits */
if(!HYD_PMCD_pmi_proxy_params.debug)
exit(0);
}
else if(proc_id == 0 ) {
/* Child proc continues */
int i;
pid_t spid;
spid = setsid();
if(spid == -1)
HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "setsid() failed(%s)\n",
HYDU_strerror(errno));
if(!HYD_PMCD_pmi_proxy_params.debug)
for(i=0; i<rl.rlim_max; i++)
close(i);
/* FIXME: dup(0,1,2) to "/dev/null" */
if(getenv("HYD_PROXY_PORT"))
HYD_PMCD_pmi_proxy_params.proxy_port = atoi(getenv("HYD_PROXY_PORT"));
else
HYD_PMCD_pmi_proxy_params.proxy_port = -1;
status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
(uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
HYDU_ERR_POP(status, "unable to listen on socket\n");
/* Register the listening socket with the demux engine */
status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
HYD_PMCD_pmi_proxy_listen_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else {
HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "fork() failed (%s) \n",
HYDU_strerror(errno));
}
while(1) {
status = HYD_DMX_wait_for_event(-1);
HYDU_ERR_POP(status, "demux engine error waiting for event\n");
}
fn_exit:
return status;
fn_fail:
goto fn_exit;
}
int main(int argc, char **argv)
{
int i, j, arg, count, pid, ret_status;
......@@ -26,12 +96,18 @@ int main(int argc, char **argv)
status = HYD_PMCD_pmi_proxy_get_params(argc, argv);
HYDU_ERR_POP(status, "bad parameters passed to the proxy\n");
if(HYD_PMCD_pmi_proxy_params.is_persistent) {
status = HYD_PMCD_pmi_pproxy_start();
HYDU_ERR_POP(status, "Error starting persistent PMI proxy\n");
goto fn_exit;
}
status = HYDU_sock_listen(&HYD_PMCD_pmi_proxy_listenfd, NULL,
(uint16_t *) & HYD_PMCD_pmi_proxy_params.proxy_port);
HYDU_ERR_POP(status, "unable to listen on socket\n");
/* Register the listening socket with the demux engine */
status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT,
status = HYD_DMX_register_fd(1, &HYD_PMCD_pmi_proxy_listenfd, HYD_STDOUT, NULL,
HYD_PMCD_pmi_proxy_listen_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
......@@ -123,7 +199,7 @@ int main(int argc, char **argv)
HYD_PMCD_pmi_proxy_params.stdin_buf_offset = 0;
HYD_PMCD_pmi_proxy_params.stdin_buf_count = 0;
status =
HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, HYD_PMCD_pmi_proxy_stdin_cb);
HYD_DMX_register_fd(1, &stdin_fd, HYD_STDIN, NULL, HYD_PMCD_pmi_proxy_stdin_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
}
else {
......@@ -142,12 +218,12 @@ int main(int argc, char **argv)
/* Everything is spawned, now wait for I/O */
status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
HYD_PMCD_pmi_proxy_params.out,
HYD_STDOUT, HYD_PMCD_pmi_proxy_stdout_cb);
HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stdout_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
status = HYD_DMX_register_fd(HYD_PMCD_pmi_proxy_params.exec_proc_count,
HYD_PMCD_pmi_proxy_params.err,
HYD_STDOUT, HYD_PMCD_pmi_proxy_stderr_cb);
HYD_STDOUT, NULL, HYD_PMCD_pmi_proxy_stderr_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
while (1) {
......
......@@ -11,7 +11,10 @@
#include "hydra_utils.h"
struct HYD_PMCD_pmi_proxy_params {
int debug;
int proxy_port;
int is_persistent;
char *wdir;
HYD_Binding binding;
char *user_bind_map;
......@@ -31,6 +34,7 @@ struct HYD_PMCD_pmi_proxy_params {
int *err;
int *exit_status;
int in;
int rproxy_connfd;
int stdin_buf_offset;
int stdin_buf_count;
......@@ -40,10 +44,18 @@ struct HYD_PMCD_pmi_proxy_params {
extern struct HYD_PMCD_pmi_proxy_params HYD_PMCD_pmi_proxy_params;
extern int HYD_PMCD_pmi_proxy_listenfd;
HYD_Status HYD_PMCD_pmi_proxy_init_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
HYD_Status HYD_PMCD_pmi_proxy_cleanup_params(struct HYD_PMCD_pmi_proxy_params *proxy_params);
HYD_Status HYD_PMCD_pmi_proxy_get_params(int t_argc, char **t_argv);
HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events);
HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events);
HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events);
HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events);
HYD_Status HYD_PMCD_pmi_proxy_get_next_keyvalp(char **bufp, int *buf_lenp, char **keyp,
int *key_lenp, char **valp, int *val_lenp, char separator);
HYD_Status HYD_PMCD_pmi_proxy_handle_cmd(int fd, char *cmd, int cmd_len);
HYD_Status HYD_PMCD_pmi_proxy_handle_launch_cmd(int job_connfd, char *launch_cmd, int cmd_len);
HYD_Status HYD_PMCD_pmi_proxy_listen_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_remote_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_rstdout_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stdout_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stderr_cb(int fd, HYD_Event_t events, void *userp);
HYD_Status HYD_PMCD_pmi_proxy_stdin_cb(int fd, HYD_Event_t events, void *userp);
#endif /* PMI_PROXY_H_INCLUDED */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment