Commit 5160e94f authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r6578] For checkpointing code, use PMI_PORT instead of PMI_FD.

With BLCR, since all the processes are restarted by the BLCR library,
and not by Hydra directly, we cannot provide a new PMI_FD for the MPI
processes to use. So, we instead use the PMI_PORT mechanism and ask
the MPI processes to connect back.

This commit also contains a cleanup of the socket fd maintenance
within the code to distinguish between uninitialized sockets and
closed sockets. While this part is independent, parts of it overlapped
with getting the combined PMI_PORT/PMI_FD code working.
parent 20317f1b
......@@ -138,6 +138,12 @@ extern char **environ;
#define HYDRA_NAMESERVER_DEFAULT_PORT 6392
/* fd state */
enum HYD_fd_state {
HYD_FD_UNSET = -1,
HYD_FD_CLOSED = -2
};
/* Status information */
typedef enum {
HYD_SUCCESS = 0,
......
......@@ -22,17 +22,18 @@ static HYD_status init_params(void)
HYD_pmcd_pmip.system_global.enable_stdin = -1;
HYD_pmcd_pmip.system_global.global_core_count = -1;
HYD_pmcd_pmip.system_global.global_process_count = -1;
HYD_pmcd_pmip.system_global.pmi_port = NULL;
HYD_pmcd_pmip.system_global.pmi_fd = NULL;
HYD_pmcd_pmip.system_global.pmi_rank = -1;
HYD_pmcd_pmip.system_global.pmi_process_mapping = NULL;
HYD_pmcd_pmip.upstream.server_name = NULL;
HYD_pmcd_pmip.upstream.server_port = -1;
HYD_pmcd_pmip.upstream.control = -1;
HYD_pmcd_pmip.upstream.control = HYD_FD_UNSET;
HYD_pmcd_pmip.downstream.out = NULL;
HYD_pmcd_pmip.downstream.err = NULL;
HYD_pmcd_pmip.downstream.in = -1;
HYD_pmcd_pmip.downstream.in = HYD_FD_UNSET;
HYD_pmcd_pmip.downstream.pid = NULL;
HYD_pmcd_pmip.downstream.exit_status = NULL;
HYD_pmcd_pmip.downstream.pmi_rank = NULL;
......@@ -73,6 +74,9 @@ static void cleanup_params(void)
if (HYD_pmcd_pmip.system_global.pmi_fd)
HYDU_FREE(HYD_pmcd_pmip.system_global.pmi_fd);
if (HYD_pmcd_pmip.system_global.pmi_port)
HYDU_FREE(HYD_pmcd_pmip.system_global.pmi_port);
if (HYD_pmcd_pmip.system_global.pmi_process_mapping)
HYDU_FREE(HYD_pmcd_pmip.system_global.pmi_process_mapping);
......@@ -220,9 +224,9 @@ int main(int argc, char **argv)
* are, we will just wait for more events. */
count = 0;
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
if (HYD_pmcd_pmip.downstream.out[i] != -1)
if (HYD_pmcd_pmip.downstream.out[i] != HYD_FD_CLOSED)
count++;
if (HYD_pmcd_pmip.downstream.err[i] != -1)
if (HYD_pmcd_pmip.downstream.err[i] != HYD_FD_CLOSED)
count++;
if (count)
......
......@@ -38,6 +38,7 @@ struct HYD_pmcd_pmip {
/* PMI */
char *pmi_fd;
char *pmi_port;
int pmi_rank; /* If this is -1, we auto-generate it */
char *pmi_process_mapping;
} system_global; /* Global system parameters */
......
......@@ -17,6 +17,7 @@ struct HYD_pmcd_pmip_pmi_handle *HYD_pmcd_pmip_pmi_handle = { 0 };
static int storage_len = 0;
static char storage[HYD_TMPBUF_SIZE], *sptr = storage, r[HYD_TMPBUF_SIZE];
static int using_pmi_port = 0;
static HYD_status stdio_cb(int fd, HYD_event_t events, void *userp)
{
......@@ -40,7 +41,7 @@ static HYD_status stdio_cb(int fd, HYD_event_t events, void *userp)
close(fd);
close(HYD_pmcd_pmip.downstream.in);
HYD_pmcd_pmip.downstream.in = -1;
HYD_pmcd_pmip.downstream.in = HYD_FD_CLOSED;
}
goto fn_exit;
......@@ -57,12 +58,12 @@ static HYD_status stdio_cb(int fd, HYD_event_t events, void *userp)
if (stdfd == STDOUT_FILENO) {
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
if (HYD_pmcd_pmip.downstream.out[i] == fd)
HYD_pmcd_pmip.downstream.out[i] = -1;
HYD_pmcd_pmip.downstream.out[i] = HYD_FD_CLOSED;
}
else {
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
if (HYD_pmcd_pmip.downstream.err[i] == fd)
HYD_pmcd_pmip.downstream.err[i] = -1;
HYD_pmcd_pmip.downstream.err[i] = HYD_FD_CLOSED;
}
close(fd);
......@@ -261,11 +262,13 @@ static HYD_status pmi_cb(int fd, HYD_event_t events, void *userp)
status = read_pmi_cmd(fd, &closed);
HYDU_ERR_POP(status, "unable to read PMI command\n");
/* Try to find the PMI FD */
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
if (HYD_pmcd_pmip.downstream.pmi_fd[i] == fd)
break;
HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
/* If we used the PMI_FD format, try to find the PMI FD */
if (!using_pmi_port) {
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
if (HYD_pmcd_pmip.downstream.pmi_fd[i] == fd)
break;
HYDU_ASSERT(i < HYD_pmcd_pmip.local.proxy_process_count, status);
}
if (closed) {
/* This is a hack to improve user-friendliness. If a PMI
......@@ -275,13 +278,14 @@ static HYD_status pmi_cb(int fd, HYD_event_t events, void *userp)
* soon as we get the finalize message. For non-PMI
* applications, this is harder to identify, so we just let
* the user cleanup the processes on a failure. */
if (HYD_pmcd_pmip.downstream.pmi_fd_active[i])
if (using_pmi_port || HYD_pmcd_pmip.downstream.pmi_fd_active[i])
HYD_pmcd_pmip_killjob();
goto fn_exit;
}
/* This is a PMI application */
HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 1;
if (!using_pmi_port)
HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 1;
do {
status = check_pmi_cmd(&buf, &hdr.pmi_version, &repeat);
......@@ -428,13 +432,13 @@ fn_fail:
static HYD_status launch_procs(void)
{
int i, j, arg, stdin_fd, process_id, os_index, pmi_rank;
char *str, *envstr, *list;
char *str, *envstr, *list, *pmi_port;
char *client_args[HYD_NUM_TMP_STRINGS];
struct HYD_env *env, *opt_env = NULL, *force_env = NULL;
struct HYD_exec *exec;
enum HYD_pmcd_pmi_cmd cmd;
int *pmi_ranks;
int sent, closed, pmi_fds[2] = { -1, -1 };
int sent, closed, pmi_fds[2] = { HYD_FD_UNSET, HYD_FD_UNSET };
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
......@@ -467,9 +471,11 @@ static HYD_status launch_procs(void)
HYDU_MALLOC(HYD_pmcd_pmip.downstream.pmi_fd_active, int *,
HYD_pmcd_pmip.local.proxy_process_count * sizeof(int), status);
/* Initialize the PMI FD active state */
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
/* Initialize the PMI_FD and PMI FD active state */
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++) {
HYD_pmcd_pmip.downstream.pmi_fd[i] = HYD_FD_UNSET;
HYD_pmcd_pmip.downstream.pmi_fd_active[i] = 0;
}
/* Initialize the exit status */
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
......@@ -485,13 +491,18 @@ static HYD_status launch_procs(void)
HYD_pmcd_pmip.user_global.ckpoint_prefix);
HYDU_ERR_POP(status, "unable to initialize checkpointing\n");
if (HYD_pmcd_pmip.exec_list->exec[0] == NULL) { /* Checkpoint restart cast */
char *pmi_port;
status = HYDU_sock_create_and_listen_portstr(HYD_pmcd_pmip.user_global.iface,
NULL, &pmi_port, pmi_listen_cb, NULL);
HYDU_ERR_POP(status, "unable to create PMI port\n");
if (HYD_pmcd_pmip.system_global.pmi_port || HYD_pmcd_pmip.user_global.ckpoint_prefix) {
using_pmi_port = 1;
if (HYD_pmcd_pmip.system_global.pmi_port)
pmi_port = HYD_pmcd_pmip.system_global.pmi_port;
else {
status = HYDU_sock_create_and_listen_portstr(HYD_pmcd_pmip.user_global.iface,
NULL, &pmi_port, pmi_listen_cb, NULL);
HYDU_ERR_POP(status, "unable to create PMI port\n");
}
}
if (HYD_pmcd_pmip.exec_list->exec[0] == NULL) { /* Checkpoint restart cast */
status = HYDU_env_create(&env, "PMI_PORT", pmi_port);
HYDU_ERR_POP(status, "unable to create env\n");
......@@ -604,7 +615,6 @@ static HYD_status launch_procs(void)
for (i = 0; i < exec->proc_count; i++) {
/* PMI_RANK */
if (HYD_pmcd_pmip.system_global.pmi_rank == -1)
pmi_rank = HYDU_local_to_global_id(process_id,
HYD_pmcd_pmip.start_pid,
......@@ -616,47 +626,65 @@ static HYD_status launch_procs(void)
HYD_pmcd_pmip.downstream.pmi_rank[process_id] = pmi_rank;
str = HYDU_int_to_str(pmi_rank);
status = HYDU_env_create(&env, "PMI_RANK", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
if (HYD_pmcd_pmip.system_global.pmi_port || HYD_pmcd_pmip.user_global.ckpoint_prefix) {
/* If a global PMI_PORT is provided, or this is a
* checkpointing case, use PMI_PORT format */
/* PMI_PORT */
status = HYDU_env_create(&env, "PMI_PORT", pmi_port);
HYDU_ERR_POP(status, "unable to create env\n");
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
/* PMI_FD */
if (HYD_pmcd_pmip.system_global.pmi_fd) {
/* If a global PMI port is provided, use it */
status = HYDU_env_create(&env, "PMI_FD", HYD_pmcd_pmip.system_global.pmi_fd);
/* PMI_ID */
str = HYDU_int_to_str(pmi_rank);
status = HYDU_env_create(&env, "PMI_ID", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
}
else {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pmi_fds) < 0)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "pipe error\n");
status = HYDT_dmx_register_fd(1, &pmi_fds[0], HYD_POLLIN, NULL, pmi_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
str = HYDU_int_to_str(pmi_fds[1]);
status = HYDU_env_create(&env, "PMI_FD", str);
/* PMI_RANK */
str = HYDU_int_to_str(pmi_rank);
status = HYDU_env_create(&env, "PMI_RANK", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
HYD_pmcd_pmip.downstream.pmi_fd[process_id] = pmi_fds[0];
}
/* PMI_FD */
if (HYD_pmcd_pmip.system_global.pmi_fd) {
/* If a global PMI port is provided, use it */
status = HYDU_env_create(&env, "PMI_FD", HYD_pmcd_pmip.system_global.pmi_fd);
HYDU_ERR_POP(status, "unable to create env\n");
}
else {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, pmi_fds) < 0)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "pipe error\n");
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
status = HYDT_dmx_register_fd(1, &pmi_fds[0], HYD_POLLIN, NULL, pmi_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
str = HYDU_int_to_str(pmi_fds[1]);
status = HYDU_env_create(&env, "PMI_FD", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
/* PMI_SIZE */
str = HYDU_int_to_str(HYD_pmcd_pmip.system_global.global_process_count);
status = HYDU_env_create(&env, "PMI_SIZE", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
HYD_pmcd_pmip.downstream.pmi_fd[process_id] = pmi_fds[0];
}
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
/* PMI_SIZE */
str = HYDU_int_to_str(HYD_pmcd_pmip.system_global.global_process_count);
status = HYDU_env_create(&env, "PMI_SIZE", str);
HYDU_ERR_POP(status, "unable to create env\n");
HYDU_FREE(str);
status = HYDU_append_env_to_list(*env, &force_env);
HYDU_ERR_POP(status, "unable to add env to list\n");
}
for (j = 0, arg = 0; exec->exec[j]; j++)
client_args[arg++] = HYDU_strdup(exec->exec[j]);
......@@ -688,9 +716,9 @@ static HYD_status launch_procs(void)
HYDU_ERR_POP(status, "create process returned error\n");
}
if (pmi_fds[1] != -1) {
if (pmi_fds[1] != HYD_FD_UNSET) {
close(pmi_fds[1]);
pmi_fds[1] = -1;
pmi_fds[1] = HYD_FD_CLOSED;
}
process_id++;
......
......@@ -140,7 +140,7 @@ static HYD_status fn_initack(int fd, char *args[])
/* Store the PMI_ID to fd mapping */
for (i = 0; i < HYD_pmcd_pmip.local.proxy_process_count; i++)
if (HYD_pmcd_pmip.downstream.pmi_rank[i] == id)
HYD_pmcd_pmip.downstream.pmi_rank[i] = fd;
HYD_pmcd_pmip.downstream.pmi_fd[i] = fd;
i = 0;
tmp[i++] = HYDU_strdup("cmd=initack\ncmd=set size=");
......
......@@ -93,6 +93,11 @@ static HYD_status enable_stdin_fn(char *arg, char ***argv)
return HYDU_set_int_and_incr(arg, argv, &HYD_pmcd_pmip.system_global.enable_stdin);
}
static HYD_status pmi_port_fn(char *arg, char ***argv)
{
return HYDU_set_str_and_incr(arg, argv, &HYD_pmcd_pmip.system_global.pmi_port);
}
static HYD_status pmi_fd_fn(char *arg, char ***argv)
{
return HYDU_set_str_and_incr(arg, argv, &HYD_pmcd_pmip.system_global.pmi_fd);
......@@ -384,6 +389,7 @@ struct HYD_arg_match_table HYD_pmcd_pmip_match_table[] = {
{"enable-stdin", enable_stdin_fn, NULL},
/* Executable parameters */
{"pmi-port", pmi_port_fn, NULL},
{"pmi-fd", pmi_fd_fn, NULL},
{"pmi-rank", pmi_rank_fn, NULL},
{"pmi-kvsname", pmi_kvsname_fn, NULL},
......
......@@ -118,9 +118,9 @@ static HYD_status cleanup_proxy_connection(int fd, struct HYD_proxy *proxy)
HYDU_ERR_POP(status, "error deregistering fd\n");
close(fd);
/* Reset the control fd to -1, so when the fd is reused, we don't
* find the wrong proxy */
proxy->control_fd = -1;
/* Reset the control fd, so when the fd is reused, we don't find
* the wrong proxy */
proxy->control_fd = HYD_FD_CLOSED;
for (tproxy = pg->proxy_list; tproxy; tproxy = tproxy->next) {
if (tproxy->exit_status == NULL)
......@@ -131,19 +131,19 @@ static HYD_status cleanup_proxy_connection(int fd, struct HYD_proxy *proxy)
pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch;
/* If the PMI listen fd has been initialized, deregister it */
if (pg_scratch->pmi_listen_fd != HYD_PMCD_PMI_FD_UNSET) {
if (pg_scratch->pmi_listen_fd != HYD_FD_UNSET) {
status = HYDT_dmx_deregister_fd(pg_scratch->pmi_listen_fd);
HYDU_ERR_POP(status, "unable to deregister PMI listen fd\n");
close(pg_scratch->pmi_listen_fd);
pg_scratch->pmi_listen_fd = HYD_PMCD_PMI_FD_CLOSED;
pg_scratch->pmi_listen_fd = HYD_FD_CLOSED;
}
if (pg_scratch->control_listen_fd != HYD_PMCD_PMI_FD_UNSET) {
if (pg_scratch->control_listen_fd != HYD_FD_UNSET) {
status = HYDT_dmx_deregister_fd(pg_scratch->control_listen_fd);
HYDU_ERR_POP(status, "unable to deregister control listen fd\n");
close(pg_scratch->control_listen_fd);
}
pg_scratch->control_listen_fd = HYD_PMCD_PMI_FD_CLOSED;
pg_scratch->control_listen_fd = HYD_FD_CLOSED;
/* If this is the main PG, free the debugger PID list */
if (pg->pgid == 0)
......@@ -385,23 +385,23 @@ HYD_status HYD_pmcd_pmiserv_cleanup(void)
/* Close the control listen port, so new proxies cannot
* connect back */
pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch;
if (pg_scratch->control_listen_fd != HYD_PMCD_PMI_FD_UNSET) {
if (pg_scratch->control_listen_fd != HYD_FD_UNSET) {
status = HYDT_dmx_deregister_fd(pg_scratch->control_listen_fd);
HYDU_ERR_POP(status, "unable to deregister control listen fd\n");
close(pg_scratch->control_listen_fd);
}
pg_scratch->control_listen_fd = HYD_PMCD_PMI_FD_CLOSED;
pg_scratch->control_listen_fd = HYD_FD_CLOSED;
for (proxy = pg->proxy_list; proxy; proxy = proxy->next) {
/* The proxy has not been setup yet */
if (proxy->control_fd == -1)
if (proxy->control_fd == HYD_FD_UNSET)
continue;
status = HYDT_dmx_deregister_fd(proxy->control_fd);
HYDU_ERR_POP(status, "error deregistering fd\n");
close(proxy->control_fd);
proxy->control_fd = -1;
proxy->control_fd = HYD_FD_CLOSED;
}
}
......
......@@ -222,8 +222,7 @@ HYD_status HYD_pmci_launch_procs(void)
struct HYD_proxy *proxy;
struct HYD_node *node_list = NULL, *node, *tnode;
char *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL }, *control_port = NULL;
char *pmi_fd = NULL;
int pmi_rank = -1, enable_stdin, ret, node_count, i, *control_fd;
int enable_stdin, ret, node_count, i, *control_fd;
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
......@@ -231,23 +230,6 @@ HYD_status HYD_pmci_launch_procs(void)
status = HYDT_dmx_register_fd(1, &HYD_handle.cleanup_pipe[0], POLLIN, NULL, ui_cmd_cb);
HYDU_ERR_POP(status, "unable to register fd\n");
/* Initialize PMI */
ret = MPL_env2str("PMI_FD", (const char **) &pmi_fd);
if (ret) { /* PMI_FD already set */
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "someone else already set PMI FD\n");
pmi_fd = HYDU_strdup(pmi_fd);
ret = MPL_env2int("PMI_RANK", &pmi_rank);
if (!ret)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "PMI_FD set but not PMI_RANK\n");
}
else {
pmi_rank = -1;
}
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "PMI FD: %s; PMI ID: %d\n", pmi_fd, pmi_rank);
status = HYD_pmcd_pmi_alloc_pg_scratch(&HYD_handle.pg_list);
HYDU_ERR_POP(status, "error allocating pg scratch space\n");
......@@ -282,7 +264,7 @@ HYD_status HYD_pmci_launch_procs(void)
status = HYD_pmcd_pmi_fill_in_proxy_args(proxy_args, control_port, 0);
HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
status = HYD_pmcd_pmi_fill_in_exec_launch_info(pmi_fd, pmi_rank, &HYD_handle.pg_list);
status = HYD_pmcd_pmi_fill_in_exec_launch_info(&HYD_handle.pg_list);
HYDU_ERR_POP(status, "unable to fill in executable arguments\n");
status = HYDT_dmx_stdin_valid(&enable_stdin);
......@@ -290,14 +272,14 @@ HYD_status HYD_pmci_launch_procs(void)
HYDU_MALLOC(control_fd, int *, node_count * sizeof(int), status);
for (i = 0; i < node_count; i++)
control_fd[i] = -1;
control_fd[i] = HYD_FD_UNSET;
status = HYDT_bsci_launch_procs(proxy_args, node_list, control_fd, enable_stdin, stdout_cb,
stderr_cb);
HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n");
for (i = 0, proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next, i++)
if (control_fd[i] != -1) {
if (control_fd[i] != HYD_FD_UNSET) {
proxy->control_fd = control_fd[i];
status = HYDT_dmx_register_fd(1, &control_fd[i], HYD_POLLIN, (void *) (size_t) 0,
......@@ -308,8 +290,6 @@ HYD_status HYD_pmci_launch_procs(void)
HYDU_FREE(control_fd);
fn_exit:
if (pmi_fd)
HYDU_FREE(pmi_fd);
if (control_port)
HYDU_FREE(control_port);
HYDU_free_strlist(proxy_args);
......@@ -345,7 +325,7 @@ HYD_status HYD_pmci_wait_for_completion(int timeout)
for (pg = &HYD_handle.pg_list; pg; pg = pg->next) {
pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch;
while (pg_scratch->control_listen_fd != HYD_PMCD_PMI_FD_CLOSED) {
while (pg_scratch->control_listen_fd != HYD_FD_CLOSED) {
status = HYDT_dmx_wait_for_event(-1);
HYDU_ERR_POP(status, "error waiting for event\n");
}
......
......@@ -17,11 +17,6 @@ extern struct HYD_pmcd_pmi_handle *HYD_pmcd_pmi_v1;
/* PMI-2 specific definitions */
extern struct HYD_pmcd_pmi_handle *HYD_pmcd_pmi_v2;
enum HYD_pmcd_pmi_fd_state {
HYD_PMCD_PMI_FD_UNSET = -1,
HYD_PMCD_PMI_FD_CLOSED = -2
};
struct HYD_pmcd_token_segment {
int start_idx;
int token_count;
......
......@@ -273,9 +273,9 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
struct HYD_pmcd_token_segment *segment_list = NULL;
int token_count, i, j, k, pmi_rank = -1, new_pgid, total_spawns, offset;
int token_count, i, j, k, new_pgid, total_spawns, offset;
int argcnt, num_segments;
char *pmi_fd = NULL, *control_port, *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
char *control_port, *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
char *tmp[HYD_NUM_TMP_STRINGS];
HYD_status status = HYD_SUCCESS;
......@@ -511,23 +511,6 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "Got a control port string of %s\n", control_port);
/* Initialize PMI */
ret = MPL_env2str("PMI_FD", (const char **) &pmi_fd);
if (ret) { /* PMI_FD already set */
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "someone else already set PMI port\n");
pmi_fd = HYDU_strdup(pmi_fd);
ret = MPL_env2int("PMI_RANK", &pmi_rank);
if (!ret)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "PMI_FD set but not PMI_RANK\n");
}
else {
pmi_rank = -1;
}
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "PMI port: %s; PMI ID: %d\n", pmi_fd, pmi_rank);
/* Go to the last PG */
for (pg = &HYD_handle.pg_list; pg->next; pg = pg->next);
......@@ -552,9 +535,8 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
HYDU_FREE(control_port);
status = HYD_pmcd_pmi_fill_in_exec_launch_info(pmi_fd, pmi_rank, pg);
status = HYD_pmcd_pmi_fill_in_exec_launch_info(pg);
HYDU_ERR_POP(status, "unable to fill in executable arguments\n");
HYDU_FREE(pmi_fd);
status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, 0, HYD_handle.stdout_cb,
HYD_handle.stderr_cb);
......
......@@ -389,7 +389,7 @@ static HYD_status fn_kvs_fence(int fd, int pid, int pgid, char *args[])
/* couldn't find the current process; find a NULL entry */
for (i = 0; i < proxy->pg->pg_process_count; i++)
if (pg_scratch->ecount[i].fd == -1)
if (pg_scratch->ecount[i].fd == HYD_FD_UNSET)
break;
pg_scratch->ecount[i].fd = fd;
......@@ -470,9 +470,9 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
struct HYD_pmcd_token_segment *segment_list = NULL;
int token_count, i, j, k, pmi_rank = -1, new_pgid, offset;
int token_count, i, j, k, new_pgid, offset;
int argcnt, num_segments;
char *pmi_fd = NULL, *control_port, *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
char *control_port, *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL };
char *tmp[HYD_NUM_TMP_STRINGS];
HYD_status status = HYD_SUCCESS;
......@@ -694,23 +694,6 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "Got a control port string of %s\n", control_port);
/* Initialize PMI */
ret = MPL_env2str("PMI_FD", (const char **) &pmi_fd);
if (ret) { /* PMI_FD already set */
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "someone else already set PMI port\n");
pmi_fd = HYDU_strdup(pmi_fd);
ret = MPL_env2int("PMI_RANK", &pmi_rank);
if (!ret)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "PMI_FD set but not PMI_RANK\n");
}
else {
pmi_rank = -1;
}
if (HYD_handle.user_global.debug)
HYDU_dump(stdout, "PMI port: %s; PMI ID: %d\n", pmi_fd, pmi_rank);
/* Go to the last PG */
for (pg = &HYD_handle.pg_list; pg->next; pg = pg->next);
......@@ -735,9 +718,8 @@ static HYD_status fn_spawn(int fd, int pid, int pgid, char *args[])
HYDU_ERR_POP(status, "unable to fill in proxy arguments\n");
HYDU_FREE(control_port);
status = HYD_pmcd_pmi_fill_in_exec_launch_info(pmi_fd, pmi_rank, pg);
status = HYD_pmcd_pmi_fill_in_exec_launch_info(pg);
HYDU_ERR_POP(status, "unable to fill in executable arguments\n");
HYDU_FREE(pmi_fd);
status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, 0, HYD_handle.stdout_cb,
HYD_handle.stderr_cb);
......
......@@ -96,7 +96,7 @@ HYD_status HYD_pmcd_pmi_fill_in_proxy_args(char **proxy_args, char *control_port
goto fn_exit;
}
HYD_status HYD_pmcd_pmi_fill_in_exec_launch_info(char