Commit 306a2eb2 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r3992] First working draft of the proxy. The proxy still doesn't handle

stdin. Also, the interface between the proxy and the boot-strap server
is broken; so a proxy has no way to launch more proxies. Thus, only a
one-level hierarchy of proxies is supported right now.
parent 1ea6aa33
......@@ -31,8 +31,7 @@ HYD_Status HYD_BSCI_Launch_procs(void)
status = HYD_BSCU_Set_common_signals(HYD_BSCU_Signal_handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set signal\n");
HYDU_Error_printf("signal utils returned error when trying to set signal\n");
goto fn_fail;
}
......@@ -41,10 +40,8 @@ HYD_Status HYD_BSCI_Launch_procs(void)
* they want launched. Without this functionality, the proxy
* cannot use this and will have to perfom its own launch. */
process_id = 0;
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->group_rank) /* Only rank 0 is spawned */
continue;
......@@ -72,11 +69,9 @@ HYD_Status HYD_BSCI_Launch_procs(void)
status =
HYDU_Create_process(client_arg,
(process_id == 0 ? &handle.in : NULL),
&partition->out, &partition->err,
&partition->pid);
&partition->out, &partition->err, &partition->pid);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("bootstrap spawn process returned error\n");
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
......@@ -110,10 +105,8 @@ HYD_Status HYD_BSCI_Cleanup_procs(void)
HYDU_FUNC_ENTER();
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
......@@ -135,11 +128,9 @@ HYD_Status HYD_BSCI_Cleanup_procs(void)
client_arg[arg++] = MPIU_Strdup(execname);
client_arg[arg++] = NULL;
status =
HYDU_Create_process(client_arg, NULL, NULL, NULL, NULL);
status = HYDU_Create_process(client_arg, NULL, NULL, NULL, NULL);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("bootstrap spawn process returned error\n");
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
}
......
......@@ -17,30 +17,26 @@ HYD_Status HYD_BSCU_Set_common_signals(void (*handler) (int))
status = HYDU_Set_signal(SIGINT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set SIGINT signal\n");
HYDU_Error_printf("signal utils returned error when trying to set SIGINT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGQUIT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set SIGQUIT signal\n");
HYDU_Error_printf("signal utils returned error when trying to set SIGQUIT signal\n");
goto fn_fail;
}
status = HYDU_Set_signal(SIGTERM, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set SIGTERM signal\n");
HYDU_Error_printf("signal utils returned error when trying to set SIGTERM signal\n");
goto fn_fail;
}
#if defined SIGSTOP
status = HYDU_Set_signal(SIGSTOP, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set SIGSTOP signal\n");
HYDU_Error_printf("signal utils returned error when trying to set SIGSTOP signal\n");
goto fn_fail;
}
#endif /* SIGSTOP */
......@@ -48,8 +44,7 @@ HYD_Status HYD_BSCU_Set_common_signals(void (*handler) (int))
#if defined SIGCONT
status = HYDU_Set_signal(SIGCONT, handler);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("signal utils returned error when trying to set SIGCONT signal\n");
HYDU_Error_printf("signal utils returned error when trying to set SIGCONT signal\n");
goto fn_fail;
}
#endif /* SIGCONT */
......
......@@ -27,10 +27,8 @@ HYD_Status HYD_BSCU_Wait_for_completion(void)
HYDU_FUNC_ENTER();
not_completed = 0;
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next)
for (partition = proc_params->partition; partition;
partition = partition->next)
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next)
for (partition = proc_params->partition; partition; partition = partition->next)
if (partition->exit_status == -1)
not_completed++;
......
......@@ -25,19 +25,15 @@ HYD_Status HYD_CSI_Close_fd(int fd)
/* Deregister the FD with the demux engine and close it. */
status = HYD_DMX_Deregister_fd(fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("demux engine returned error when deregistering fd: %d\n",
fd);
HYDU_Error_printf("demux engine returned error when deregistering fd: %d\n", fd);
goto fn_fail;
}
close(fd);
/* Find the FD in the handle and remove it. */
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out == fd) {
partition->out = -1;
goto fn_exit;
......
......@@ -23,30 +23,23 @@ HYD_Status HYD_CSI_Launch_procs(void)
status = HYD_PMCI_Launch_procs();
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("process manager returned error when launching processes\n");
HYDU_Error_printf("process manager returned error when launching processes\n");
goto fn_fail;
}
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
status =
HYD_DMX_Register_fd(1, &partition->out, HYD_STDOUT,
proc_params->stdout_cb);
HYD_DMX_Register_fd(1, &partition->out, HYD_STDOUT, proc_params->stdout_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("demux engine returned error when registering fd\n");
HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
status =
HYD_DMX_Register_fd(1, &partition->err, HYD_STDOUT,
proc_params->stderr_cb);
HYD_DMX_Register_fd(1, &partition->err, HYD_STDOUT, proc_params->stderr_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("demux engine returned error when registering fd\n");
HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
......@@ -71,11 +64,9 @@ HYD_Status HYD_CSI_Launch_procs(void)
handle.stdin_buf_count = 0;
handle.stdin_buf_offset = 0;
status =
HYD_DMX_Register_fd(1, &stdin_fd, HYD_STDOUT, handle.stdin_cb);
status = HYD_DMX_Register_fd(1, &stdin_fd, HYD_STDOUT, handle.stdin_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("demux engine returned error when registering fd\n");
HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
}
......
......@@ -25,18 +25,15 @@ HYD_Status HYD_CSI_Wait_for_completion(void)
/* Wait for some event to occur */
status = HYD_DMX_Wait_for_event();
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("demux engine returned error when waiting for event\n");
HYDU_Error_printf("demux engine returned error when waiting for event\n");
goto fn_fail;
}
/* Check to see if there's any open read socket left; if there
* are, we will just wait for more events. */
sockets_open = 0;
for (proc_params = handle.proc_params; proc_params;
proc_params = proc_params->next) {
for (partition = proc_params->partition; partition;
partition = partition->next) {
for (proc_params = handle.proc_params; proc_params; proc_params = proc_params->next) {
for (partition = proc_params->partition; partition; partition = partition->next) {
if (partition->out != -1 || partition->err != -1) {
sockets_open++;
break;
......@@ -53,8 +50,7 @@ HYD_Status HYD_CSI_Wait_for_completion(void)
* control device will take care of that. */
status = HYD_BSCI_Wait_for_completion();
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("bootstrap server returned error when waiting for completion\n");
HYDU_Error_printf("bootstrap server returned error when waiting for completion\n");
goto fn_fail;
}
......
......@@ -21,9 +21,7 @@ int HYD_CSU_Time_left(void)
}
else {
gettimeofday(&now, NULL);
time_left =
(1000 *
(handle.timeout.tv_sec - now.tv_sec + handle.start.tv_sec));
time_left = (1000 * (handle.timeout.tv_sec - now.tv_sec + handle.start.tv_sec));
if (time_left < 0)
time_left = 0;
}
......
......@@ -44,8 +44,7 @@ static void print_callback_list()
}
HYD_Status HYD_DMX_Register_fd(int num_fds, int *fd, HYD_Event_t events,
HYD_Status(*callback) (int fd,
HYD_Event_t events))
HYD_Status(*callback) (int fd, HYD_Event_t events))
{
HYD_DMXI_Callback_t *cb_element, *run;
int i;
......@@ -61,8 +60,7 @@ HYD_Status HYD_DMX_Register_fd(int num_fds, int *fd, HYD_Event_t events,
}
}
HYDU_MALLOC(cb_element, HYD_DMXI_Callback_t *,
sizeof(HYD_DMXI_Callback_t), status);
HYDU_MALLOC(cb_element, HYD_DMXI_Callback_t *, sizeof(HYD_DMXI_Callback_t), status);
cb_element->num_fds = num_fds;
HYDU_MALLOC(cb_element->fd, int *, num_fds * sizeof(int), status);
memcpy(cb_element->fd, fd, num_fds * sizeof(int));
......@@ -134,8 +132,7 @@ HYD_Status HYD_DMX_Wait_for_event(void)
HYDU_FUNC_ENTER();
HYDU_MALLOC(pollfds, struct pollfd *,
num_cb_fds * sizeof(struct pollfd), status);
HYDU_MALLOC(pollfds, struct pollfd *, num_cb_fds * sizeof(struct pollfd), status);
run = cb_list;
i = 0;
......@@ -188,8 +185,7 @@ HYD_Status HYD_DMX_Wait_for_event(void)
status = run->callback(pollfds[i].fd, events);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("callback returned error status\n",
errno);
HYDU_Error_printf("callback returned error status\n", errno);
goto fn_fail;
}
}
......
......@@ -10,8 +10,7 @@
#include "hydra.h"
HYD_Status HYD_DMX_Register_fd(int num_fds, int *fd, HYD_Event_t events,
HYD_Status(*callback) (int fd,
HYD_Event_t events));
HYD_Status(*callback) (int fd, HYD_Event_t events));
HYD_Status HYD_DMX_Deregister_fd(int fd);
HYD_Status HYD_DMX_Wait_for_event(void);
HYD_Status HYD_DMX_Finalize(void);
......
......@@ -52,11 +52,11 @@ extern char **environ;
#endif /* MANUAL_EXTERN_ENVIRON */
typedef enum {
HYD_SUCCESS = 0,
HYD_NO_MEM,
HYD_SOCK_ERROR,
HYD_INVALID_PARAM,
HYD_INTERNAL_ERROR,
HYD_SUCCESS = 0
HYD_INTERNAL_ERROR
} HYD_Status;
#define HYD_STDOUT (1)
......
......@@ -12,13 +12,13 @@
HYD_Status HYDU_Env_global_list(HYD_Env_t ** env_list);
char *HYDU_Env_type_str(HYD_Env_type_t type);
HYD_Env_t *HYDU_Env_dup(HYD_Env_t env);
HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t * env);
HYD_Env_t *HYDU_Env_found_in_list(HYD_Env_t * env_list, HYD_Env_t env);
HYD_Status HYDU_Env_add_to_list(HYD_Env_t ** env_list, HYD_Env_t env);
HYD_Env_t *HYDU_Env_listdup(HYD_Env_t * env);
HYD_Status HYDU_Env_create(HYD_Env_t ** env, char *env_name,
char *env_value, HYD_Env_type_t env_type,
int start);
char *env_value, HYD_Env_type_t env_type, int start);
HYD_Status HYDU_Env_free(HYD_Env_t * env);
HYD_Status HYDU_Env_free_list(HYD_Env_t * env);
HYD_Status HYDU_Env_putenv(HYD_Env_t env);
#endif /* HYDRA_ENV_H_INCLUDED */
......@@ -9,12 +9,11 @@
#include "hydra.h"
HYD_Status HYDU_Append_env(HYD_Env_t * env_list, char **client_arg,
int id);
HYD_Status HYDU_Append_env(HYD_Env_t * env_list, char **client_arg, int id);
HYD_Status HYDU_Append_exec(char **exec, char **client_arg);
HYD_Status HYDU_Append_wdir(char **client_arg);
HYD_Status HYDU_Append_wdir(char **client_arg, char *wdir);
HYD_Status HYDU_Allocate_Partition(struct HYD_Partition_list **partition);
HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out,
int *err, int *pid);
HYD_Status HYDU_Create_process(char **client_arg, int *in, int *out, int *err, int *pid);
HYD_Status HYDU_Dump_args(char **args);
#endif /* HYDRA_LAUNCH_H_INCLUDED */
......@@ -19,8 +19,7 @@
#define size_t unsigned int
#endif /* size_t */
HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range,
uint16_t * port);
HYD_Status HYDU_Sock_listen(int *listen_fd, char *port_range, uint16_t * port);
HYD_Status HYDU_Sock_connect(const char *host, uint16_t port, int *fd);
HYD_Status HYDU_Sock_accept(int listen_fd, int *fd);
HYD_Status HYDU_Sock_readline(int fd, char *buf, int maxlen, int *linelen);
......@@ -29,5 +28,8 @@ HYD_Status HYDU_Sock_writeline(int fd, char *buf, int maxsize);
HYD_Status HYDU_Sock_write(int fd, char *buf, int maxsize);
HYD_Status HYDU_Sock_set_nonblock(int fd);
HYD_Status HYDU_Sock_set_cloexec(int fd);
HYD_Status HYDU_Sock_stdout_cb(int fd, HYD_Event_t events, int stdout_fd, int *closed);
HYD_Status HYDU_Sock_stdin_cb(int fd, HYD_Event_t events, char *buf, int *buf_count,
int *buf_offset, int *closed);
#endif /* HYDRA_SOCKS_H_INCLUDED */
......@@ -13,45 +13,27 @@ HYD_Handle handle;
HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
{
int count;
char buf[HYD_TMPBUF_SIZE];
int closed;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
if (events & HYD_STDIN) {
HYDU_Error_printf("stdout handler got an stdin event: %d\n",
events);
status = HYD_INTERNAL_ERROR;
/* Write output to fd 1 */
status = HYDU_Sock_stdout_cb(fd, events, 1, &closed);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("socket stdout callback error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
count = read(fd, buf, HYD_TMPBUF_SIZE);
if (count < 0) {
HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
else if (count == 0) {
/* The connection has closed */
if (closed) {
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n",
fd, errno);
HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
goto fn_exit;
}
count = write(1, buf, count);
if (count < 0) {
HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
fn_exit:
HYDU_FUNC_EXIT();
return status;
......@@ -63,45 +45,28 @@ HYD_Status HYD_LCHI_stdout_cb(int fd, HYD_Event_t events)
HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
{
int count;
int count, closed;
char buf[HYD_TMPBUF_SIZE];
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
if (events & HYD_STDIN) {
HYDU_Error_printf("stderr handler got an stdin event: %d\n",
events);
status = HYD_INTERNAL_ERROR;
/* Write output to fd 2 */
status = HYDU_Sock_stdout_cb(fd, events, 2, &closed);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("socket stdout callback error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
count = read(fd, buf, HYD_TMPBUF_SIZE);
if (count < 0) {
HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n", fd,
errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
else if (count == 0) {
/* The connection has closed */
if (closed) {
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n",
fd, errno);
HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
goto fn_exit;
}
count = write(2, buf, count);
if (count < 0) {
HYDU_Error_printf("socket write error on fd: %d (errno: %d)\n", fd,
errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
fn_exit:
HYDU_FUNC_EXIT();
return status;
......@@ -113,64 +78,25 @@ HYD_Status HYD_LCHI_stderr_cb(int fd, HYD_Event_t events)
HYD_Status HYD_LCHI_stdin_cb(int fd, HYD_Event_t events)
{
int count;
int count, closed;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
if (events & HYD_STDIN) {
HYDU_Error_printf
("stdin handler got a writeable event on local stdin: %d\n",
events);
status = HYD_INTERNAL_ERROR;
status = HYDU_Sock_stdin_cb(fd, events, handle.stdin_tmp_buf,
&handle.stdin_buf_count, &handle.stdin_buf_offset, &closed);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("sock stdin callback returned an error\n");
status = HYD_SOCK_ERROR;
goto fn_fail;
}
while (1) {
/* If we already have buffered data, send it out */
if (handle.stdin_buf_count) {
count =
write(handle.in,
handle.stdin_tmp_buf + handle.stdin_buf_offset,
handle.stdin_buf_count);
if (count < 0) {
/* We can't get an EAGAIN as we just got out of poll */
HYDU_Error_printf
("socket write error on fd: %d (errno: %d)\n",
handle.in, errno);
status = HYD_SOCK_ERROR;
goto fn_fail;
}
handle.stdin_buf_offset += count;
handle.stdin_buf_count -= count;
break;
}
/* If we are still here, we need to refill our temporary buffer */
count = read(0, handle.stdin_tmp_buf, HYD_TMPBUF_SIZE);
if (count < 0) {
if (errno == EINTR || errno == EAGAIN) {
/* This call was interrupted or there was no data to read; just break out. */
break;
}
HYDU_Error_printf("socket read error on fd: %d (errno: %d)\n",
fd, errno);
status = HYD_SOCK_ERROR;
if (closed) {
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("socket close error on fd: %d (errno: %d)\n", fd, errno);
goto fn_fail;
}
else if (count == 0) {
/* The connection has closed */
status = HYD_CSI_Close_fd(fd);
if (status != HYD_SUCCESS) {
HYDU_Error_printf
("socket close error on fd: %d (errno: %d)\n", fd,
errno);
goto fn_fail;
}
break;
}
handle.stdin_buf_count += count;
}
fn_exit:
......
......@@ -17,36 +17,25 @@ HYD_Handle handle;
static void usage(void)
{
printf("\n");
printf
("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
printf("Usage: ./mpiexec [global opts] [exec1 local opts] : [exec2 local opts] : ...\n\n");
printf("Global Options (passed to all executables):\n");
printf("\t-v/-vv/-vvv [Verbose level]\n");
printf
("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
printf
("\t-genv {name} {value} [Environment variable name and value]\n");
printf
("\t-genvlist {env1,env2,...} [Environment variable list to pass]\n");
printf
("\t-genvnone [Do not pass any environment variables]\n");
printf
("\t-genvall [Pass all environment variables (default)]\n");
printf("\t--enable-x/--disable-x [Enable or disable X forwarding]\n");
printf("\t-genv {name} {value} [Environment variable name and value]\n");
printf("\t-genvlist {env1,env2,...} [Environment variable list to pass]\n");
printf("\t-genvnone [Do not pass any environment variables]\n");
printf("\t-genvall [Pass all environment variables (default)]\n");
printf("\n");
printf("Local Options (passed to individual executables):\n");
printf("\t-n/-np {value} [Number of processes]\n");
printf
("\t-f {name} [File containing the host names]\n");
printf
("\t-env {name} {value} [Environment variable name and value]\n");
printf
("\t-envlist {env1,env2,...} [Environment variable list to pass]\n");
printf
("\t-envnone [Do not pass any environment variables]\n");
printf
("\t-envall [Pass all environment variables (default)]\n");
printf("\t-f {name} [File containing the host names]\n");
printf("\t-env {name} {value} [Environment variable name and value]\n");
printf("\t-envlist {env1,env2,...} [Environment variable list to pass]\n");
printf("\t-envnone [Do not pass any environment variables]\n");
printf("\t-envall [Pass all environment variables (default)]\n");
printf
("\t{exec_name} {args} [Name of the executable to run and its arguments]\n");
......@@ -110,8 +99,7 @@ int main(int argc, char **argv)
/* Launch the processes */
status = HYD_CSI_Launch_procs();
if (status != HYD_SUCCESS) {
HYDU_Error_printf