Commit a788acc4 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r9000] Revert previous changes for handling the case where the RMK is not

PBS (node list is custom).

Reviewed by chan.
parent af3e9e65
......@@ -12,21 +12,13 @@
#if defined(HAVE_TM_H)
#include "tm.h"
#define HYDT_PBS_STRLEN 160
struct HYDT_bscd_pbs_node {
tm_node_id id;
char name[HYDT_PBS_STRLEN];
};
struct HYDT_bscd_pbs_sys {
struct tm_roots tm_root;
int spawned_count;
int size;
tm_task_id *taskIDs; /* Array of TM task(process) IDs */
tm_event_t *events; /* Array of TM event IDs */
int *taskobits; /* Array of TM task exit codes */
int num_nodes; /* Number of elements in nodes[] */
struct HYDT_bscd_pbs_node *nodes; /* Array of pbs_nodes[] */
struct tm_roots tm_root;
int spawned_count;
int size;
tm_task_id *taskIDs; /* Array of TM task(process) IDs */
tm_event_t *events; /* Array of TM event IDs */
int *taskobits; /* Array of TM task exit codes */
};
extern struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
......
......@@ -29,8 +29,6 @@ HYD_status HYDT_bscd_pbs_launcher_finalize(void)
HYDU_FREE(HYDT_bscd_pbs_sys->events);
if (HYDT_bscd_pbs_sys->taskIDs)
HYDU_FREE(HYDT_bscd_pbs_sys->taskIDs);
if (HYDT_bscd_pbs_sys->nodes)
HYDU_FREE(HYDT_bscd_pbs_sys->nodes);
HYDU_FREE(HYDT_bscd_pbs_sys);
}
......
......@@ -13,8 +13,8 @@ struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
HYD_status HYDT_bsci_launcher_pbs_init(void)
{
int ierr;
HYD_status status = HYD_SUCCESS;
int ierr;
HYDU_FUNC_ENTER();
......@@ -31,16 +31,13 @@ HYD_status HYDT_bsci_launcher_pbs_init(void)
* the number of processes allocated in this PBS job. */
ierr = tm_init(NULL, &(HYDT_bscd_pbs_sys->tm_root));
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_init() fails with TM err=%d.\n",
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "tm_init() fails with TM err=%d.\n",
ierr);
HYDT_bscd_pbs_sys->spawned_count = 0;
HYDT_bscd_pbs_sys->size = 0;
HYDT_bscd_pbs_sys->taskIDs = NULL;
HYDT_bscd_pbs_sys->events = NULL;
HYDT_bscd_pbs_sys->taskobits = NULL;
HYDT_bscd_pbs_sys->num_nodes = 0;
HYDT_bscd_pbs_sys->nodes = NULL;
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -8,126 +8,24 @@
#include "bsci.h"
#include "pbs.h"
/* #define TS_PROFILE 1 */
#if defined(TS_PROFILE)
#include <sys/time.h>
double TS_Wtime( void );
double TS_Wtime( void )
{
struct timeval tval;
gettimeofday( &tval, NULL );
return ( (double) tval.tv_sec + (double) tval.tv_usec * 0.000001 );
}
#endif
static char* HYDI_pbs_trim_space( char *str )
{
char *newstr = NULL;
int len, idx;
len = strlen( str );
/* Locate the Last non-white space character and pad it with NULL */
for (idx=len-1; idx>=0; idx--) {
if ( !isspace(str[idx]) ) {
str[idx+1] = 0;
len = idx;
break;
}
}
/* Locate the First non-white space character */
for (idx=0; idx < len; idx++) {
if ( !isspace(str[idx]) ) {
newstr = &(str[idx]);
break;
}
}
return newstr;
}
static HYD_status HYDI_pbs_parse_for_nodes(const char *nodefile)
{
char line[HYDT_PBS_STRLEN];
FILE *fp;
int idx;
int num_nodes;
struct HYDT_bscd_pbs_node *nodes = NULL;
HYD_status status = HYD_SUCCESS;
if ((fp = fopen(nodefile, "r")) == NULL) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"PBS nodefile, %s, can't be opened.\n", nodefile);
}
/* Go over once to find the number of lines */
for (num_nodes = 0; fgets(line, HYDT_PBS_STRLEN, fp); num_nodes++) ;
/* Allocate the memory for the array of nodes */
HYDU_MALLOC(nodes, struct HYDT_bscd_pbs_node *,
num_nodes * sizeof(struct HYDT_bscd_pbs_node), status);
/* Allocate the memory for each of member in the array of nodes */
rewind(fp);
for (idx = 0; fgets(line, HYDT_PBS_STRLEN, fp); idx++) {
nodes[idx].id = idx;
strncpy(nodes[idx].name, HYDI_pbs_trim_space(line), HYDT_PBS_STRLEN);
}
fclose(fp);
/* Update global PBS data structure */
HYDT_bscd_pbs_sys->num_nodes = num_nodes;
HYDT_bscd_pbs_sys->nodes = nodes;
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
/* Comparison function for bsearch() of array of pbs_node[] based on name. */
static int cmp_pbsnode(const void *m1, const void *m2)
{
struct HYDT_bscd_pbs_node *n1 = (struct HYDT_bscd_pbs_node *) m1;
struct HYDT_bscd_pbs_node *n2 = (struct HYDT_bscd_pbs_node *) m2;
return strcmp(n1->name, n2->name);
}
HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
int *control_fd)
{
int proxy_count, spawned_count, args_count;
int is_rmk_pbs, idx, ierr;
int proxy_count, spawned_count;
int args_count, events_count;
int ierr, idx, spawned_hostID;
struct HYD_proxy *proxy;
char *targs[HYD_NUM_TMP_STRINGS];
char *nodefile = NULL;
HYD_status status = HYD_SUCCESS;
#if defined(TS_PROFILE)
double stime, etime;
#endif
HYDU_FUNC_ENTER();
/* Determine what RMK is being using */
is_rmk_pbs = !strcmp(HYDT_bsci_info.rmk, "pbs");
/* Parse PBS_NODEFILE for all the node names */
nodefile = (char *) getenv("PBS_NODEFILE");
if (!nodefile)
/* If the RMK is not PBS, error out for the time being. This needs
* to be modified to reparse the host file and find the spawn IDs
* separately. */
if (strcmp(HYDT_bsci_info.rmk, "pbs"))
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"PBS_NODEFILE is undefined in PBS launcher.\n");
HYDI_pbs_parse_for_nodes(nodefile);
if (HYDT_bsci_info.debug) {
for (idx=0; idx<HYDT_bscd_pbs_sys->num_nodes; idx++) {
HYDU_dump(stdout, "ID=%d, name=%s.\n",
(HYDT_bscd_pbs_sys->nodes[idx]).id,
(HYDT_bscd_pbs_sys->nodes[idx]).name);
}
}
"using a non-PBS RMK with the PBS launcher is not supported\n");
proxy_count = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next)
......@@ -137,8 +35,7 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
/* Check if number of proxies > number of processes in this PBS job */
if (proxy_count > (HYDT_bscd_pbs_sys->tm_root).tm_nnodes)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Number of proxies(%d) > TM node count(%d)!\n",
proxy_count,
"Number of proxies(%d) > TM node count(%d)!\n", proxy_count,
(HYDT_bscd_pbs_sys->tm_root).tm_nnodes);
/* Duplicate the args in local copy, targs */
......@@ -151,90 +48,31 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
HYDU_MALLOC(HYDT_bscd_pbs_sys->events, tm_event_t *,
HYDT_bscd_pbs_sys->size * sizeof(tm_event_t), status);
/* Sort the pbs_node[] in ascending name order for bsearch() when RMK=PBS */
if (is_rmk_pbs)
qsort(HYDT_bscd_pbs_sys->nodes, HYDT_bscd_pbs_sys->num_nodes,
sizeof(struct HYDT_bscd_pbs_node), cmp_pbsnode);
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
/* Spawn a process on each allocated node through tm_spawn() which
* returns a taskID for the process + a eventID for the spawning.
* The returned taskID won't be ready for access until tm_poll()
* has returned the corresponding eventID. */
if (is_rmk_pbs) {
if (HYDT_bsci_info.debug)
HYDU_dump(stdout,"RMK == PBS\n");
spawned_count = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next) {
struct HYDT_bscd_pbs_node key, *found;
strncpy(key.name, proxy->node->hostname, HYDT_PBS_STRLEN);
found = bsearch(&key,
HYDT_bscd_pbs_sys->nodes,
HYDT_bscd_pbs_sys->num_nodes,
sizeof(struct HYDT_bscd_pbs_node), cmp_pbsnode);
if (found == NULL) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Can't locate proxy, %s, in PBS nodefile\n",
proxy->node->hostname);
}
targs[args_count] = HYDU_int_to_str(proxy->proxy_id);
ierr = tm_spawn(args_count + 1, targs, NULL, found->id,
HYDT_bscd_pbs_sys->taskIDs + spawned_count,
HYDT_bscd_pbs_sys->events + spawned_count);
if (HYDT_bsci_info.debug)
HYDU_dump(stdout, "DEBUG: %d, tm_spawn(TM_nodeID=%d,name=%s)\n",
spawned_count, found->id, found->name);
if (ierr != TM_SUCCESS) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_spawn() fails with TM err %d!\n", ierr);
}
spawned_count++;
}
HYDT_bscd_pbs_sys->spawned_count = spawned_count;
}
else {
char *spawned_name;
spawned_count = 0;
spawned_name = NULL;
spawned_count = 0;
spawned_hostID = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next) {
targs[args_count] = HYDU_int_to_str(spawned_count);
ierr = tm_spawn(args_count + 1, targs, NULL, spawned_hostID,
HYDT_bscd_pbs_sys->taskIDs + spawned_count,
HYDT_bscd_pbs_sys->events + spawned_count);
if (HYDT_bsci_info.debug)
HYDU_dump(stdout,"RMK != PBS\n");
for (idx=0; idx < HYDT_bscd_pbs_sys->num_nodes; idx++) {
struct HYDT_bscd_pbs_node *found;
if ( !spawned_name
|| strcmp(spawned_name, (HYDT_bscd_pbs_sys->nodes[idx]).name) )
spawned_name = (HYDT_bscd_pbs_sys->nodes[idx]).name;
else
continue;
found = &(HYDT_bscd_pbs_sys->nodes[idx]);
/* ? Pavan : Not sure what proxyID is, use spawned_count for now */
targs[args_count] = HYDU_int_to_str(spawned_count);
ierr = tm_spawn(args_count + 1, targs, NULL, found->id,
HYDT_bscd_pbs_sys->taskIDs + spawned_count,
HYDT_bscd_pbs_sys->events + spawned_count);
if (HYDT_bsci_info.debug)
HYDU_dump(stdout, "DEBUG: %d, tm_spawn(TM_nodeID=%d,name=%s)\n",
spawned_count, found->id, found->name);
if (ierr != TM_SUCCESS) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_spawn() fails with TM err %d!\n", ierr);
}
spawned_count++;
HYDU_dump(stdout, "PBS_DEBUG: %d, tm_spawn(hostID=%d,name=%s)\n",
spawned_count, spawned_hostID, proxy->node->hostname);
if (ierr != TM_SUCCESS) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_spawn() fails with TM err %d!\n", ierr);
}
HYDT_bscd_pbs_sys->spawned_count = spawned_count;
spawned_hostID += proxy->node->core_count;
spawned_count++;
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_spawn() loop takes %f\n", etime-stime );
#endif
HYDT_bscd_pbs_sys->spawned_count = spawned_count;
#if defined(COMMENTED_OUT)
/* Poll the TM for the spawning eventID returned by tm_spawn() to
* determine if the spawned process has started. */
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
events_count = 0;
while (events_count < spawned_count) {
tm_event_t event = -1;
......@@ -242,8 +80,7 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(spawn_event) fails with TM err %d.\n",
ierr);
"tm_poll(spawn_event) fails with TM err %d.\n", ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
......@@ -253,16 +90,11 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
event, HYDT_bscd_pbs_sys->taskIDs[idx]);
}
events_count++;
break; /* break from for(idx<spawned_count) loop */
break; /* break from for(idx<spawned_count) loop */
}
}
}
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_poll(spawn_events) loop takes %f\n", etime-stime );
#endif
#endif
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -8,39 +8,20 @@
#include "bsci.h"
#include "pbs.h"
/* #define TS_PROFILE 1 */
#if defined(TS_PROFILE)
double TS_Wtime( void );
#endif
/* integer comparison function for tm_event_t[] */
static int cmp_pbsevent( const void *p1, const void *p2 )
{
return (*(tm_event_t*)p1) - (*(tm_event_t*)p2);
}
HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
{
tm_event_t *pollevents = NULL;
int time_elapsed;
int events_count, spawned_count;
int mismatch, idx, ierr;
struct timeval start_tval, curr_tval;
HYD_status status = HYD_SUCCESS;
#if defined(TS_PROFILE)
double stime, etime;
#endif
int time_elapsed;
int events_count, spawned_count;
int idx, ierr;
struct timeval start_tval, curr_tval;
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
/* Allocate memory for taskobits[] and pollevents[]. */
/* Allocate memory for taskobits[] */
HYDU_MALLOC(HYDT_bscd_pbs_sys->taskobits, int *,
HYDT_bscd_pbs_sys->size * sizeof(int), status);
spawned_count = HYDT_bscd_pbs_sys->spawned_count;
HYDU_MALLOC(pollevents, tm_event_t *,
spawned_count * sizeof(tm_event_t), status);
/*
* FIXME: We rely on gettimeofday here. This needs to detect the
......@@ -49,57 +30,6 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
*/
gettimeofday(&start_tval, NULL);
/* Poll the TM for the spawning eventID returned by tm_spawn() to
* determine if the spawned process has started. */
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
events_count = 0;
while (events_count < spawned_count) {
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &pollevents[events_count], 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(spawn_event) fails with TM err %d.\n",
ierr);
if (pollevents[events_count] != TM_NULL_EVENT) {
events_count++;
}
}
/* Sort HYDT_bscd_pbs_sys->events[] and pollevents[] */
qsort(HYDT_bscd_pbs_sys->events, spawned_count,
sizeof(tm_event_t), cmp_pbsevent);
qsort(pollevents, spawned_count, sizeof(tm_event_t), cmp_pbsevent);
mismatch = 0;
for ( idx = 0; idx < spawned_count; idx++ ) {
if ( pollevents[idx] != HYDT_bscd_pbs_sys->events[idx] ) {
mismatch = 1;
break;
}
}
if ( mismatch ) {
/* Pavan: what is the correct HYDU macro to do this kind of fprintf ? */
fprintf(stderr, "tm_spawn()'s events[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", HYDT_bscd_pbs_sys->events[idx]);
fprintf(stderr, "\n");
fprintf(stderr, "pollevents[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", pollevents[idx]);
fprintf(stderr, "\n");
fflush(stderr);
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Inconsistent pollevents[] & tm_spawn()'s events[]:\n");
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_poll(spawn_events) loop takes %f\n", etime-stime );
#endif
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
/* Register with TM to be notified the obituary of the spawning process. */
for (idx = 0; idx < spawned_count; idx++) {
/*
......@@ -107,8 +37,7 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
* the process labelled by taskID dies
*/
ierr = tm_obit(HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits + idx,
HYDT_bscd_pbs_sys->events + idx);
HYDT_bscd_pbs_sys->taskobits + idx, HYDT_bscd_pbs_sys->events + idx);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_obit() fails with TM err=%d.\n", ierr);
......@@ -120,36 +49,38 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits[idx]);
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_obit() loop takes %f\n", etime-stime );
#endif
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
/* Sort HYDT_bscd_pbs_sys->events[] */
qsort(HYDT_bscd_pbs_sys->events, spawned_count,
sizeof(tm_event_t), cmp_pbsevent);
/* Poll if the spawned process has exited */
events_count = 0;
/* Substract all the processes that have already exited */
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT) {
pollevents[events_count] = TM_NULL_EVENT;
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT)
events_count++;
}
}
/* Polling for the remaining alive processes till they all exit */
while (events_count < spawned_count) {
tm_event_t event = -1;
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &pollevents[events_count], 0, &poll_err);
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(obit_event) fails with err=%d.\n",
ierr);
if (pollevents[events_count] != TM_NULL_EVENT) {
events_count++;
"tm_poll(obit_event) fails with err=%d.\n", ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout,
"PBS_DEBUG: Event %d received, task %d exits with status %d.\n",
event, HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits[idx]);
/*
* HYDU_error_printf("DEBUG: Event %d received, task %d exits with status %d.\n", event, HYDT_bscd_pbs_sys->taskIDs[idx], HYDT_bscd_pbs_sys->taskobits[idx]);
*/
}
events_count++;
break; /* break from for(idx<spawned_count) loop */
}
}
}
/* Check if time is up */
......@@ -161,43 +92,12 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
goto fn_exit;
}
}
} /* while (events_count < spawned_count) */
/* Sort pollevents[] in sync with HYDT_bscd_pbs_sys->events[] */
qsort(pollevents, spawned_count, sizeof(tm_event_t), cmp_pbsevent);
mismatch = 0;
for ( idx = 0; idx < spawned_count; idx++ ) {
if ( pollevents[idx] != HYDT_bscd_pbs_sys->events[idx] ) {
mismatch = 1;
break;
}
}
if ( mismatch ) {
/* Pavan: what is the correct HYDU macro to do this kind of fprintf ? */
fprintf(stderr, "tm_obit()'s events[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", HYDT_bscd_pbs_sys->events[idx]);
fprintf(stderr, "\n");
fprintf(stderr, "pollevents[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", pollevents[idx]);
fprintf(stderr, "\n");
fflush(stderr);
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Inconsistent pollevents[] & tm_obit()'s events[]:\n");
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_poll(obit_events) loop takes %f\n", etime-stime );
#endif
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout, "\nPBS_DEBUG: Done with polling obit events!\n");
}
if (pollevents)
HYDU_FREE(pollevents);
/* Loop till all sockets have closed */
fn_exit:
HYDU_FUNC_EXIT();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment