Commit d7397d3b authored by Anthony Chan's avatar Anthony Chan
Browse files

[svn-r8968] Added parsing of PBS_NODEFILE within pbs launcher. Added search...

[svn-r8968] Added parsing of PBS_NODEFILE within pbs launcher. Added search for proxy hostname in PBS_NODEFILE using bsearch. Replaced the linear search mechanism of polling of events from tm_spawn() and tm_obit() by qsort() and bsearch().
parent 46de6bdb
......@@ -12,13 +12,21 @@
#if defined(HAVE_TM_H)
#include "tm.h"
#define HYDT_PBS_STRLEN 160
struct HYDT_bscd_pbs_node {
tm_node_id id;
char name[HYDT_PBS_STRLEN];
};
struct HYDT_bscd_pbs_sys {
struct tm_roots tm_root;
int spawned_count;
int size;
tm_task_id *taskIDs; /* Array of TM task(process) IDs */
tm_event_t *events; /* Array of TM event IDs */
int *taskobits; /* Array of TM task exit codes */
struct tm_roots tm_root;
int spawned_count;
int size;
tm_task_id *taskIDs; /* Array of TM task(process) IDs */
tm_event_t *events; /* Array of TM event IDs */
int *taskobits; /* Array of TM task exit codes */
int num_nodes; /* Number of elements in nodes[] */
struct HYDT_bscd_pbs_node *nodes; /* Array of pbs_nodes[] */
};
extern struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
......
......@@ -29,6 +29,8 @@ HYD_status HYDT_bscd_pbs_launcher_finalize(void)
HYDU_FREE(HYDT_bscd_pbs_sys->events);
if (HYDT_bscd_pbs_sys->taskIDs)
HYDU_FREE(HYDT_bscd_pbs_sys->taskIDs);
if (HYDT_bscd_pbs_sys->nodes)
HYDU_FREE(HYDT_bscd_pbs_sys->nodes);
HYDU_FREE(HYDT_bscd_pbs_sys);
}
......
......@@ -11,10 +11,77 @@
#if defined(HAVE_TM_H)
struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
static char* HYD_pbs_trim_space( char *str )
{
char *newstr = NULL;
int len, idx;
len = strlen( str );
/* Locate the Last non-white space character and pad it with NULL */
for (idx=len-1; idx>=0; idx--) {
if ( !isspace(str[idx]) ) {
str[idx+1] = 0;
len = idx;
break;
}
}
/* Locate the First non-white space character */
for (idx=0; idx < len; idx++) {
if ( !isspace(str[idx]) ) {
newstr = &(str[idx]);
break;
}
}
return newstr;
}
static HYD_status HYPU_pbs_parse_for_nodes(const char *nodefile)
{
char line[HYDT_PBS_STRLEN];
FILE *fp;
int idx;
int num_nodes;
struct HYDT_bscd_pbs_node *nodes = NULL;
HYD_status status = HYD_SUCCESS;
if ((fp = fopen(nodefile, "r")) == NULL) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"unable to open host file: %s\n", nodefile);
}
/* Go over once to find the number of lines */
for (num_nodes = 0; fgets(line, HYDT_PBS_STRLEN, fp); num_nodes++) ;
/* Allocate the memory for the array of nodes */
HYDU_MALLOC(nodes, struct HYDT_bscd_pbs_node *,
num_nodes * sizeof(struct HYDT_bscd_pbs_node), status);
/* Allocate the memory for each of member in the array of nodes */
rewind(fp);
for (idx = 0; fgets(line, HYDT_PBS_STRLEN, fp); idx++) {
nodes[idx].id = idx;
strncpy(nodes[idx].name, HYD_pbs_trim_space(line), HYDT_PBS_STRLEN);
}
fclose(fp);
/* Update global PBS data structure */
HYDT_bscd_pbs_sys->num_nodes = num_nodes;
HYDT_bscd_pbs_sys->nodes = nodes;
fn_exit:
HYDU_FUNC_EXIT();
return status;
fn_fail:
goto fn_exit;
}
HYD_status HYDT_bsci_launcher_pbs_init(void)
{
char *nodefile = NULL;
int ierr;
HYD_status status = HYD_SUCCESS;
int idx;
HYDU_FUNC_ENTER();
......@@ -39,6 +106,24 @@ HYD_status HYDT_bsci_launcher_pbs_init(void)
HYDT_bscd_pbs_sys->taskIDs = NULL;
HYDT_bscd_pbs_sys->events = NULL;
HYDT_bscd_pbs_sys->taskobits = NULL;
HYDT_bscd_pbs_sys->num_nodes = 0;
HYDT_bscd_pbs_sys->nodes = NULL;
/* Parse PBS_NODEFILE for all the node names */
nodefile = (char *) getenv("PBS_NODEFILE");
if (!nodefile)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"PBS_NODEFILE is undefined in PBS launcher.\n");
HYPU_pbs_parse_for_nodes(nodefile);
if (HYDT_bsci_info.debug) {
for (idx=0; idx<HYDT_bscd_pbs_sys->num_nodes; idx++) {
HYDU_dump(stdout, "ID=%d, name=%s.\n",
(HYDT_bscd_pbs_sys->nodes[idx]).id,
(HYDT_bscd_pbs_sys->nodes[idx]).name);
}
}
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -21,11 +21,19 @@ double TS_Wtime( void )
}
#endif
/* Comparison function for bsearch() of array of pbs_node[] based on name. */
static int cmp_pbsnode(const void *m1, const void *m2)
{
struct HYDT_bscd_pbs_node *n1 = (struct HYDT_bscd_pbs_node *) m1;
struct HYDT_bscd_pbs_node *n2 = (struct HYDT_bscd_pbs_node *) m2;
return strcmp(n1->name, n2->name);
}
HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
int *control_fd)
{
int proxy_count, spawned_count, args_count;
int ierr, spawned_hostID;
int ierr;
struct HYD_proxy *proxy;
char *targs[HYD_NUM_TMP_STRINGS];
HYD_status status = HYD_SUCCESS;
......@@ -65,6 +73,9 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
HYDU_MALLOC(HYDT_bscd_pbs_sys->events, tm_event_t *,
HYDT_bscd_pbs_sys->size * sizeof(tm_event_t), status);
/* Sort the pbs_node[] in ascending name order for bsearch() */
qsort(HYDT_bscd_pbs_sys->nodes, HYDT_bscd_pbs_sys->num_nodes,
sizeof(struct HYDT_bscd_pbs_node), cmp_pbsnode);
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
......@@ -73,20 +84,29 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
* The returned taskID won't be ready for access until tm_poll()
* has returned the corresponding eventID. */
spawned_count = 0;
spawned_hostID = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next) {
struct HYDT_bscd_pbs_node key, *found;
strncpy(key.name, proxy->node->hostname, HYDT_PBS_STRLEN);
found = bsearch(&key,
HYDT_bscd_pbs_sys->nodes,
HYDT_bscd_pbs_sys->num_nodes,
sizeof(struct HYDT_bscd_pbs_node), cmp_pbsnode);
if (found == NULL) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Cannot locate proxy, %s, in PBS nodefile\n",
proxy->node->hostname);
}
targs[args_count] = HYDU_int_to_str(proxy->proxy_id);
ierr = tm_spawn(args_count + 1, targs, NULL, spawned_hostID,
ierr = tm_spawn(args_count + 1, targs, NULL, found->id,
HYDT_bscd_pbs_sys->taskIDs + spawned_count,
HYDT_bscd_pbs_sys->events + spawned_count);
if (HYDT_bsci_info.debug)
HYDU_dump(stdout, "PBS_DEBUG: %d, tm_spawn(hostID=%d,name=%s)\n",
spawned_count, spawned_hostID, proxy->node->hostname);
HYDU_dump(stdout, "PBS_DEBUG: %d, tm_spawn(TM_nodeID=%d,name=%s)\n",
spawned_count, found->id, proxy->node->hostname);
if (ierr != TM_SUCCESS) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_spawn() fails with TM err %d!\n", ierr);
}
spawned_hostID += proxy->node->core_count;
spawned_count++;
}
HYDT_bscd_pbs_sys->spawned_count = spawned_count;
......@@ -95,7 +115,7 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
HYDU_dump(stdout, "tm_spawn() loop takes %f\n", etime-stime );
#endif
#ifdef 0
#if defined(COMMENTED_OUT)
/* Poll the TM for the spawning eventID returned by tm_spawn() to
* determine if the spawned process has started. */
#if defined(TS_PROFILE)
......
......@@ -14,13 +14,20 @@
double TS_Wtime( void );
#endif
/* integer comparison function for tm_event_t[] */
static int cmp_pbsevent( const void *p1, const void *p2 )
{
return (*(tm_event_t*)p1) - (*(tm_event_t*)p2);
}
HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
{
int time_elapsed;
int events_count, spawned_count;
int idx, ierr;
struct timeval start_tval, curr_tval;
HYD_status status = HYD_SUCCESS;
tm_event_t *pollevents = NULL;
int time_elapsed;
int events_count, spawned_count;
int mismatch, idx, ierr;
struct timeval start_tval, curr_tval;
HYD_status status = HYD_SUCCESS;
#if defined(TS_PROFILE)
double stime, etime;
......@@ -28,10 +35,12 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
HYDU_FUNC_ENTER();
/* Allocate memory for taskobits[] */
/* Allocate memory for taskobits[] and pollevents[]. */
HYDU_MALLOC(HYDT_bscd_pbs_sys->taskobits, int *,
HYDT_bscd_pbs_sys->size * sizeof(int), status);
spawned_count = HYDT_bscd_pbs_sys->spawned_count;
HYDU_MALLOC(pollevents, tm_event_t *,
spawned_count * sizeof(tm_event_t), status);
/*
* FIXME: We rely on gettimeofday here. This needs to detect the
......@@ -47,27 +56,42 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
#endif
events_count = 0;
while (events_count < spawned_count) {
tm_event_t event = -1;
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
ierr = tm_poll(TM_NULL_EVENT, &pollevents[events_count], 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(spawn_event) fails with TM err %d.\n",
ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout,
"PBS_DEBUG: Event %d received, task %d has started.\n",
event, HYDT_bscd_pbs_sys->taskIDs[idx]);
}
events_count++;
break; /* break from for(idx<spawned_count) loop */
}
}
if (pollevents[events_count] != TM_NULL_EVENT) {
events_count++;
}
}
/* Sort HYDT_bscd_pbs_sys->events[] and pollevents[] */
qsort(HYDT_bscd_pbs_sys->events, spawned_count,
sizeof(tm_event_t), cmp_pbsevent);
qsort(pollevents, spawned_count, sizeof(tm_event_t), cmp_pbsevent);
mismatch = 0;
for ( idx = 0; idx < spawned_count; idx++ ) {
if ( pollevents[idx] != HYDT_bscd_pbs_sys->events[idx] ) {
mismatch = 1;
break;
}
}
if ( mismatch ) {
/* Pavan: what is the correct HYDU macro to do this kind of fprintf ? */
fprintf(stderr, "tm_spawn()'s events[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", HYDT_bscd_pbs_sys->events[idx]);
fprintf(stderr, "\n");
fprintf(stderr, "pollevents[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", pollevents[idx]);
fprintf(stderr, "\n");
fflush(stderr);
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Inconsistent pollevents[] & tm_spawn()'s events[]:\n");
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
HYDU_dump(stdout, "tm_poll(spawn_events) loop takes %f\n", etime-stime );
......@@ -104,37 +128,28 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
#if defined(TS_PROFILE)
stime = TS_Wtime();
#endif
/* Sort HYDT_bscd_pbs_sys->events[] */
qsort(HYDT_bscd_pbs_sys->events, spawned_count,
sizeof(tm_event_t), cmp_pbsevent);
/* Poll if the spawned process has exited */
events_count = 0;
/* Substract all the processes that have already exited */
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT)
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT) {
pollevents[events_count] = TM_NULL_EVENT;
events_count++;
}
}
/* Polling for the remaining alive processes till they all exit */
while (events_count < spawned_count) {
tm_event_t event = -1;
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
ierr = tm_poll(TM_NULL_EVENT, &pollevents[events_count], 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(obit_event) fails with err=%d.\n", ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout,
"PBS_DEBUG: Event %d received, task %d exits with status %d.\n",
event, HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits[idx]);
/*
* HYDU_error_printf("DEBUG: Event %d received, task %d exits with status %d.\n", event, HYDT_bscd_pbs_sys->taskIDs[idx], HYDT_bscd_pbs_sys->taskobits[idx]);
*/
}
events_count++;
break; /* break from for(idx<spawned_count) loop */
}
}
"tm_poll(obit_event) fails with err=%d.\n",
ierr);
if (pollevents[events_count] != TM_NULL_EVENT) {
events_count++;
}
/* Check if time is up */
......@@ -146,6 +161,30 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
goto fn_exit;
}
}
} /* while (events_count < spawned_count) */
/* Sort pollevents[] in sync with HYDT_bscd_pbs_sys->events[] */
qsort(pollevents, spawned_count, sizeof(tm_event_t), cmp_pbsevent);
mismatch = 0;
for ( idx = 0; idx < spawned_count; idx++ ) {
if ( pollevents[idx] != HYDT_bscd_pbs_sys->events[idx] ) {
mismatch = 1;
break;
}
}
if ( mismatch ) {
/* Pavan: what is the correct HYDU macro to do this kind of fprintf ? */
fprintf(stderr, "tm_obit()'s events[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", HYDT_bscd_pbs_sys->events[idx]);
fprintf(stderr, "\n");
fprintf(stderr, "pollevents[] = ");
for ( idx = 0; idx < spawned_count; idx++ )
fprintf(stderr, "%d ", pollevents[idx]);
fprintf(stderr, "\n");
fflush(stderr);
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Inconsistent pollevents[] & tm_obit()'s events[]:\n");
}
#if defined(TS_PROFILE)
etime = TS_Wtime();
......@@ -156,6 +195,9 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
HYDU_dump(stdout, "\nPBS_DEBUG: Done with polling obit events!\n");
}
if (pollevents)
HYDU_FREE(pollevents);
/* Loop till all sockets have closed */
fn_exit:
HYDU_FUNC_EXIT();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment