Commit 6cb99a56 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r3433] Several fixes and cleanups to the environment variable management code.

parent 386f000b
......@@ -12,8 +12,8 @@
HYD_Status HYD_BSCI_Launch_procs(void);
HYD_Status HYD_BSCI_Cleanup_procs(void);
HYD_Status HYD_BSCI_Get_universe_size(int * size);
HYD_Status HYD_BSCI_Wait_for_completion(void);
HYD_Status HYD_BSCI_Finalize(void);
HYD_Status HYD_BSCI_Get_universe_size(int * size);
#endif /* BSCI_H_INCLUDED */
......@@ -17,7 +17,7 @@ HYD_CSI_Handle * csi_handle;
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCI_Finalize"
HYD_Status HYD_BSCI_Finalize()
HYD_Status HYD_BSCI_Finalize(void)
{
struct HYD_CSI_Proc_params * proc_params;
HYD_Status status = HYD_SUCCESS;
......
......@@ -12,12 +12,7 @@
#include "bsci.h"
#include "bscu.h"
#define MAX_CLIENT_ARG 200
#define MAX_CLIENT_ENV 200
HYD_BSCU_Procstate_t * HYD_BSCU_Procstate;
int HYD_BSCU_Num_procs;
int HYD_BSCU_Completed_procs;
HYD_CSI_Handle * csi_handle;
/*
......@@ -30,17 +25,16 @@ HYD_CSI_Handle * csi_handle;
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCI_Launch_procs"
HYD_Status HYD_BSCI_Launch_procs()
HYD_Status HYD_BSCI_Launch_procs(void)
{
struct HYD_CSI_Proc_params * proc_params;
char * hostname, ** client_arg, ** client_env;
int i, arg, process_id;
char ** client_arg, * hostname, ** proc_list;
int i, arg, process_id, host_id, host_id_max;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
HYDU_MALLOC(client_arg, char **, MAX_CLIENT_ARG * sizeof(char *), status);
HYDU_MALLOC(client_env, char **, MAX_CLIENT_ENV * sizeof(char *), status);
HYDU_MALLOC(client_arg, char **, HYD_CSI_EXEC_ARGS * sizeof(char *), status);
status = HYD_BSCU_Init_exit_status();
if (status != HYD_SUCCESS) {
......@@ -55,19 +49,18 @@ HYD_Status HYD_BSCI_Launch_procs()
}
proc_params = csi_handle->proc_params;
hostname = NULL;
process_id = 0;
while (proc_params) {
HYDU_MALLOC(proc_params->stdout, int *, proc_params->hostlist_length * sizeof(int), status);
HYDU_MALLOC(proc_params->stderr, int *, proc_params->hostlist_length * sizeof(int), status);
for (i = 0; i < proc_params->hostlist_length; i++) {
if (hostname == NULL || proc_params->hostlist[i] != NULL) {
hostname = proc_params->hostlist[i];
}
HYDU_MALLOC(proc_params->stdout, int *, proc_params->user_num_procs * sizeof(int), status);
HYDU_MALLOC(proc_params->stderr, int *, proc_params->user_num_procs * sizeof(int), status);
HYD_BSCU_Setup_env(proc_params, client_env, process_id, status);
if (proc_params->host_file != NULL) { /* We got a new host file */
host_id = 0;
host_id_max = proc_params->total_num_procs;
proc_list = proc_params->total_proc_list;
}
for (i = 0; i < proc_params->user_num_procs; i++) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
......@@ -80,19 +73,30 @@ HYD_Status HYD_BSCI_Launch_procs()
else /* default mode is disable X */
client_arg[arg++] = MPIU_Strdup("-xq");
if (host_id == host_id_max)
host_id = 0;
hostname = proc_list[host_id];
host_id++;
client_arg[arg++] = MPIU_Strdup(hostname);
client_arg[arg++] = NULL;
HYD_BSCU_Append_env(proc_params, client_env, client_arg, arg, -1);
HYD_BSCU_Append_env(csi_handle->system_env, client_arg, process_id);
HYD_BSCU_Append_env(proc_params->prop_env, client_arg, process_id);
for (arg = 0; client_arg[arg]; arg++);
client_arg[arg++] = MPIU_Strdup("cd");
client_arg[arg++] = MPIU_Strdup(csi_handle->wdir);
client_arg[arg++] = MPIU_Strdup(";");
HYD_BSCU_Append_exec(proc_params, client_arg, arg, -1);
client_arg[arg++] = NULL;
HYD_BSCU_Append_exec(proc_params->exec, client_arg);
/* The stdin pointer will be some value for process_id 0;
* for everyone else, it's NULL. */
status = HYD_BSCU_Spawn_proc(client_arg, client_env, (process_id == 0 ? &csi_handle->stdin : NULL),
&proc_params->stdout[i], &proc_params->stderr[i],
&HYD_BSCU_Procstate[process_id].pid);
status = HYD_BSCU_Create_process(client_arg, (process_id == 0 ? &csi_handle->stdin : NULL),
&proc_params->stdout[i], &proc_params->stderr[i],
&HYD_BSCU_Procstate[process_id].pid);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
......@@ -113,10 +117,6 @@ fn_exit:
HYDU_FREE(client_arg[arg]);
HYDU_FREE(client_arg);
for (arg = 0; client_env[arg]; arg++)
HYDU_FREE(client_env[arg]);
HYDU_FREE(client_env);
HYDU_FUNC_EXIT();
return status;
......@@ -132,28 +132,33 @@ fn_fail:
HYD_Status HYD_BSCI_Cleanup_procs(void)
{
struct HYD_CSI_Proc_params * proc_params;
char * hostname, ** client_arg, ** client_env;
int i, arg, process_id, current_count, pid;
char ** client_arg, * hostname, ** proc_list;
int i, arg, host_id, host_id_max;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
HYDU_MALLOC(client_arg, char **, MAX_CLIENT_ARG * sizeof(char *), status);
HYDU_MALLOC(client_env, char **, MAX_CLIENT_ENV * sizeof(char *), status);
HYDU_MALLOC(client_arg, char **, HYD_CSI_EXEC_ARGS * sizeof(char *), status);
proc_params = csi_handle->proc_params;
hostname = NULL;
process_id = 0;
while (proc_params) {
for (i = 0; i < proc_params->hostlist_length; i++) {
if (hostname == NULL || proc_params->hostlist[i] != NULL) {
hostname = proc_params->hostlist[i];
}
for (i = 0; i < proc_params->user_num_procs; i++) {
/* Setup the executable arguments */
arg = 0;
client_arg[arg++] = MPIU_Strdup("ssh");
client_arg[arg++] = MPIU_Strdup("/usr/bin/ssh");
client_arg[arg++] = MPIU_Strdup("-xq");
if (proc_params->host_file != NULL) { /* We got a new host file */
host_id = 0;
host_id_max = proc_params->total_num_procs;
proc_list = proc_params->total_proc_list;
}
else if (host_id == host_id_max) {
host_id = 0;
}
hostname = proc_list[host_id];
host_id++;
client_arg[arg++] = MPIU_Strdup(hostname);
client_arg[arg++] = MPIU_Strdup("cd");
......@@ -161,16 +166,12 @@ HYD_Status HYD_BSCI_Cleanup_procs(void)
client_arg[arg++] = MPIU_Strdup(";");
client_arg[arg++] = MPIU_Strdup("killall");
client_arg[arg++] = NULL;
pid = HYD_BSCU_Procstate[process_id].pid;
process_id++;
if (pid == -1)
continue;
HYD_BSCU_Append_exec(proc_params, client_arg, arg, 1);
client_env[0] = NULL;
proc_params->exec[1] = 0; /* We only care about the executable name */
HYD_BSCU_Append_exec(proc_params->exec, client_arg);
status = HYD_BSCU_Spawn_proc(client_arg, client_env, NULL, NULL, NULL, NULL);
status = HYD_BSCU_Create_process(client_arg, NULL, NULL, NULL, NULL);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("bootstrap spawn process returned error\n");
goto fn_fail;
......@@ -185,10 +186,6 @@ fn_exit:
HYDU_FREE(client_arg[arg]);
HYDU_FREE(client_arg);
for (arg = 0; client_env[arg]; arg++)
HYDU_FREE(client_env[arg]);
HYDU_FREE(client_env);
HYDU_FUNC_EXIT();
return status;
......
......@@ -18,7 +18,7 @@
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCI_Wait_for_completion"
HYD_Status HYD_BSCI_Wait_for_completion()
HYD_Status HYD_BSCI_Wait_for_completion(void)
{
HYD_Status status = HYD_SUCCESS;
......
......@@ -12,64 +12,6 @@
#include "csi.h"
#include "bsci.h"
#define HYD_BSCU_Append_exec(proc_params, client_arg, arg, num_args) \
{ \
struct HYD_CSI_Exec * exec; \
int count = 0; \
exec = proc_params->exec; \
while (exec) { \
client_arg[arg++] = MPIU_Strdup(exec->arg); \
if (++count >= num_args && num_args != -1) \
break; \
exec = exec->next; \
} \
client_arg[arg] = NULL; \
}
#define HYD_BSCU_Append_env(proc_params, client_env, client_arg, arg, num_args) \
{ \
HYDU_Env_t * env; \
int count = 0; \
while (client_env[count]) { \
client_arg[arg++] = MPIU_Strdup("export"); \
client_arg[arg++] = MPIU_Strdup(client_env[count]); \
client_arg[arg++] = MPIU_Strdup(";"); \
if (++count >= num_args && num_args != -1) \
break; \
} \
client_arg[arg] = NULL; \
}
#define HYD_BSCU_Setup_env(proc_params, client_env, process_id, status) \
{ \
int arg = 0, len; \
char * env_value, * str; \
HYDU_Env_t * env; \
env = proc_params->env_list; \
while (env) { \
if (env->env_type == HYDU_ENV_STATIC) \
env_value = env->env_value; \
else { /* This is an auto-increment type */ \
/* Allocate a small buffer; this is only an integer */ \
HYDU_Int_to_str(process_id, str, status); \
HYDU_MALLOC(env_value, char *, strlen(str) + 1, status); \
MPIU_Snprintf(env_value, strlen(str) + 1, "%s", str); \
HYDU_FREE(str); \
} \
\
len = strlen(env->env_name) + 2; \
if (env_value) \
len += strlen(env_value); \
\
HYDU_MALLOC(client_env[arg], char *, len + 1, status); \
MPIU_Snprintf(client_env[arg++], len + 1, "%s=%s", env->env_name, env_value); \
if (env->env_type == HYDU_ENV_AUTOINC) \
HYDU_FREE(env_value); \
env = env->next; \
} \
client_env[arg] = NULL; \
}
typedef struct HYD_BSCU_Procstate {
int pid;
int exit_status;
......@@ -81,8 +23,10 @@ extern int HYD_BSCU_Completed_procs;
HYD_Status HYD_BSCU_Init_exit_status(void);
HYD_Status HYD_BSCU_Finalize_exit_status(void);
HYD_Status HYD_BSCU_Spawn_proc(char ** client_arg, char ** client_env, int * stdin, int * stdout, int * stderr, int * pid);
HYD_Status HYD_BSCU_Create_process(char ** client_arg, int * in, int * out, int * err, int * pid);
HYD_Status HYD_BSCU_Wait_for_completion(void);
HYD_Status HYD_BSCU_Append_env(HYDU_Env_t * env_list, char ** client_arg, int id);
HYD_Status HYD_BSCU_Append_exec(char ** exec, char ** client_arg);
HYD_Status HYD_BSCU_Set_common_signals(sighandler_t handler);
void HYD_BSCU_Signal_handler(int signal);
......
......@@ -34,9 +34,9 @@ HYD_Status HYD_BSCU_Init_exit_status(void)
HYD_BSCU_Num_procs = 0;
proc_params = csi_handle->proc_params;
while (proc_params) {
HYD_BSCU_Num_procs += proc_params->hostlist_length;
HYDU_MALLOC(proc_params->exit_status, int *, proc_params->hostlist_length * sizeof(int), status);
for (i = 0; i < proc_params->hostlist_length; i++)
HYD_BSCU_Num_procs += proc_params->user_num_procs;
HYDU_MALLOC(proc_params->exit_status, int *, proc_params->user_num_procs * sizeof(int), status);
for (i = 0; i < proc_params->user_num_procs; i++)
proc_params->exit_status[i] = 1;
proc_params = proc_params->next;
}
......@@ -86,9 +86,9 @@ fn_fail:
#if defined FUNCNAME
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCU_Spawn_proc"
HYD_Status HYD_BSCU_Spawn_proc(char ** client_arg, char ** client_env,
int * in, int * out, int * err, int * pid)
#define FUNCNAME "HYD_BSCU_Create_process"
HYD_Status HYD_BSCU_Create_process(char ** client_arg, int * in, int * out,
int * err, int * pid)
{
int inpipe[2], outpipe[2], errpipe[2], arg, tpid;
HYD_Status status = HYD_SUCCESS;
......@@ -243,3 +243,76 @@ fn_exit:
fn_fail:
goto fn_exit;
}
#if defined FUNCNAME
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCU_Append_env"
HYD_Status HYD_BSCU_Append_env(HYDU_Env_t * env_list, char ** client_arg, int id)
{
int i, j;
HYDU_Env_t * env;
char * envstr, * tmp[HYDU_NUM_JOIN_STR], * inc;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
for (i = 0; client_arg[i]; i++);
env = env_list;
while (env) {
client_arg[i++] = MPIU_Strdup("export");
j = 0;
tmp[j++] = MPIU_Strdup(env->env_name);
tmp[j++] = MPIU_Strdup("=");
if (env->env_type == HYDU_ENV_STATIC)
tmp[j++] = MPIU_Strdup(env->env_value);
else if (env->env_type == HYDU_ENV_AUTOINC) {
HYDU_Int_to_str(env->start_val + id, inc, status);
tmp[j++] = inc;
}
tmp[j++] = NULL;
HYDU_STR_ALLOC_AND_JOIN(tmp, envstr, status);
client_arg[i++] = envstr;
client_arg[i++] = MPIU_Strdup(";");
env = env->next;
}
client_arg[i++] = NULL;
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
#if defined FUNCNAME
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_BSCU_Append_exec"
HYD_Status HYD_BSCU_Append_exec(char ** exec, char ** client_arg)
{
int i, j;
HYD_Status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
for (i = 0; client_arg[i]; i++);
for (j = 0; exec[j]; j++)
client_arg[i++] = MPIU_Strdup(exec[j]);
client_arg[i++] = NULL;
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
......@@ -36,7 +36,7 @@ HYD_Status HYD_CSI_Close_fd(int fd)
/* Find the FD in the handle and remove it. */
proc_params = csi_handle->proc_params;
while (proc_params) {
for (i = 0; i < proc_params->hostlist_length; i++) {
for (i = 0; i < proc_params->user_num_procs; i++) {
if (proc_params->stdout[i] == fd) {
proc_params->stdout[i] = -1;
goto fn_exit;
......
......@@ -17,7 +17,7 @@ HYD_CSI_Handle * csi_handle;
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_CSI_Finalize"
HYD_Status HYD_CSI_Finalize()
HYD_Status HYD_CSI_Finalize(void)
{
struct HYD_CSI_Proc_params * proc_params, * p;
int i;
......@@ -43,23 +43,6 @@ HYD_Status HYD_CSI_Finalize()
goto fn_fail;
}
/* Clean up the control system structures now */
HYDU_FREE(csi_handle->wdir);
proc_params = csi_handle->proc_params;
while (proc_params) {
p = proc_params->next;
HYD_CSU_Free_env_list(proc_params->env_list);
HYD_CSU_Free_env_list(proc_params->genv_list);
HYD_CSU_Free_exec_list(proc_params->exec);
HYDU_FREE(proc_params->corelist);
for (i = 0; i < proc_params->hostlist_length; i++)
HYDU_FREE(proc_params->hostlist[i]);
HYDU_FREE(proc_params->hostlist);
HYDU_FREE(proc_params);
proc_params = p;
}
csi_handle->proc_params = NULL;
fn_exit:
HYDU_FUNC_EXIT();
return status;
......
......@@ -19,7 +19,7 @@ HYD_CSI_Handle * csi_handle;
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_CSI_Launch_procs"
HYD_Status HYD_CSI_Launch_procs()
HYD_Status HYD_CSI_Launch_procs(void)
{
struct HYD_CSI_Proc_params * proc_params;
int stdin_fd, flags, count;
......@@ -35,14 +35,14 @@ HYD_Status HYD_CSI_Launch_procs()
proc_params = csi_handle->proc_params;
while (proc_params) {
status = HYD_DMX_Register_fd(proc_params->hostlist_length, proc_params->stdout,
status = HYD_DMX_Register_fd(proc_params->user_num_procs, proc_params->stdout,
HYD_CSI_OUT, proc_params->stdout_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("demux engine returned error when registering fd\n");
goto fn_fail;
}
status = HYD_DMX_Register_fd(proc_params->hostlist_length, proc_params->stderr,
status = HYD_DMX_Register_fd(proc_params->user_num_procs, proc_params->stderr,
HYD_CSI_OUT, proc_params->stderr_cb);
if (status != HYD_SUCCESS) {
HYDU_Error_printf("demux engine returned error when registering fd\n");
......
......@@ -16,7 +16,7 @@ HYD_CSI_Handle * csi_handle;
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_CSI_Launch_procs"
HYD_Status HYD_CSI_Wait_for_completion()
HYD_Status HYD_CSI_Wait_for_completion(void)
{
int sockets_open, i;
struct HYD_CSI_Proc_params * proc_params;
......@@ -37,7 +37,7 @@ HYD_Status HYD_CSI_Wait_for_completion()
proc_params = csi_handle->proc_params;
sockets_open = 0;
while (proc_params) {
for (i = 0; i < proc_params->hostlist_length; i++) {
for (i = 0; i < proc_params->user_num_procs; i++) {
if (proc_params->stdout[i] != -1 || proc_params->stderr[i] != -1) {
sockets_open++;
break;
......
......@@ -16,23 +16,32 @@
typedef u_int16_t HYD_CSI_Event_t;
typedef struct HYD_CSI_Exec {
char * arg;
struct HYD_CSI_Exec * next;
} HYD_CSI_Exec_t;
#define HYD_CSI_TMPBUF_SIZE (64 * 1024)
#define HYD_CSI_EXEC_ARGS 200
typedef struct {
int debug_level;
int enablex;
char * wdir;
typedef enum {
HYD_CSI_PROP_NONE,
HYD_CSI_PROP_ENVALL,
HYD_CSI_PROP_ENVNONE,
HYD_CSI_PROP_ENVLIST
} HYD_CSI_Prop_t;
int stdin;
void * stdin_cb;
char stdin_tmp_buf[HYD_CSI_TMPBUF_SIZE];
int stdin_buf_offset;
int stdin_buf_count;
typedef struct {
int debug;
int enablex;
char * wdir;
HYDU_Env_t * global_env;
HYDU_Env_t * system_env;
HYDU_Env_t * user_env;
HYD_CSI_Prop_t prop;
HYDU_Env_t * prop_env;
int stdin;
void * stdin_cb;
char stdin_tmp_buf[HYD_CSI_TMPBUF_SIZE];
int stdin_buf_offset;
int stdin_buf_count;
/* Start time and timeout. These are filled in by the launcher,
* but are utilized by the demux engine and the boot-strap server
......@@ -43,19 +52,27 @@ typedef struct {
/* Each structure will contain all hosts/cores that use the same
* executable and environment. */
struct HYD_CSI_Proc_params {
int hostlist_length;
int user_num_procs;
int total_num_procs;
char ** total_proc_list;
int * total_core_list;
char ** hostlist;
int * corelist;
char * host_file;
HYD_CSI_Exec_t * exec;
HYDU_Env_t * env_list;
HYDU_Env_t * genv_list;
char * exec[HYD_CSI_EXEC_ARGS];
HYDU_Env_t * user_env;
HYD_CSI_Prop_t prop;
HYDU_Env_t * prop_env;
/* These output FDs are filled in by the lower layers */
int * stdout;
int * stderr;
/* Callback functions for the stdout/stderr events. These can
* be the same. */
void * stdout_cb;
void * stderr_cb;
/* Status > 0 means that it is not set yet. Successful
* completion of a process will set the status to 0. An error
* will set this to a negative value corresponding to the
......@@ -64,11 +81,6 @@ typedef struct {
* value for all processes. */
int * exit_status;
/* Callback functions for the stdout/stderr events. These can
* be the same. */
void * stdout_cb;
void * stderr_cb;
struct HYD_CSI_Proc_params * next;
} * proc_params;
} HYD_CSI_Handle;
......
......@@ -39,42 +39,3 @@ fn_exit:
fn_fail:
goto fn_exit;
}
#if defined FUNCNAME
#undef FUNCNAME
#endif /* FUNCNAME */
#define FUNCNAME "HYD_CSU_Free_env_list"
void HYD_CSU_Free_env_list(HYDU_Env_t * env_list)
{
HYDU_Env_t * env1, * env2;
env1 = env_list;
while (env1) {
env2 = env1->next;
if (env1->env_name)
HYDU_FREE(env1->env_name);
if (env1->env_value)
HYDU_FREE(env1->env_value);
HYDU_FREE(env1);
env1 = env2;
}
}
#if defined FUNCNAME
#undef FUNCNAME