Commit 53f6efa3 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r9001] Make a pass on the PBS launcher code to remove unnecessary code and

clean up the naming convention.

Reviewed by chan.
parent a788acc4
......@@ -13,12 +13,9 @@
#include "tm.h"
struct HYDT_bscd_pbs_sys {
struct tm_roots tm_root;
int spawned_count;
int size;
tm_task_id *taskIDs; /* Array of TM task(process) IDs */
tm_event_t *events; /* Array of TM event IDs */
int *taskobits; /* Array of TM task exit codes */
int spawn_count;
tm_event_t *spawn_events;
tm_task_id *task_id; /* Array of TM task(process) IDs */
};
extern struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
......
......@@ -15,20 +15,16 @@ HYD_status HYDT_bscd_pbs_launcher_finalize(void)
HYDU_FUNC_ENTER();
#if defined(HAVE_TM_H)
/* FIXME: Finalizing the TM library seems to be giving some weird
* errors, so we ignore it for the time being. */
err = tm_finalize();
if (err != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "error calling tm_finalize\n");
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"error calling tm_finalize\n");
#endif /* HAVE_TM_H */
if (HYDT_bscd_pbs_sys) {
if (HYDT_bscd_pbs_sys->taskobits)
HYDU_FREE(HYDT_bscd_pbs_sys->taskobits);
if (HYDT_bscd_pbs_sys->events)
HYDU_FREE(HYDT_bscd_pbs_sys->events);
if (HYDT_bscd_pbs_sys->taskIDs)
HYDU_FREE(HYDT_bscd_pbs_sys->taskIDs);
if (HYDT_bscd_pbs_sys->task_id)
HYDU_FREE(HYDT_bscd_pbs_sys->task_id);
if (HYDT_bscd_pbs_sys->spawn_events)
HYDU_FREE(HYDT_bscd_pbs_sys->spawn_events);
HYDU_FREE(HYDT_bscd_pbs_sys);
}
......
......@@ -13,7 +13,8 @@ struct HYDT_bscd_pbs_sys *HYDT_bscd_pbs_sys;
HYD_status HYDT_bsci_launcher_pbs_init(void)
{
int ierr;
int err;
struct tm_roots tm_root;
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
......@@ -26,18 +27,13 @@ HYD_status HYDT_bsci_launcher_pbs_init(void)
HYDU_MALLOC(HYDT_bscd_pbs_sys, struct HYDT_bscd_pbs_sys *,
sizeof(struct HYDT_bscd_pbs_sys), status);
/* Initialize TM and Hydra's PBS data structure: Nothing in the
* returned tm_root is useful except tm_root.tm_nnodes which is
* the number of processes allocated in this PBS job. */
ierr = tm_init(NULL, &(HYDT_bscd_pbs_sys->tm_root));
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "tm_init() fails with TM err=%d.\n",
ierr);
HYDT_bscd_pbs_sys->spawned_count = 0;
HYDT_bscd_pbs_sys->size = 0;
HYDT_bscd_pbs_sys->taskIDs = NULL;
HYDT_bscd_pbs_sys->events = NULL;
HYDT_bscd_pbs_sys->taskobits = NULL;
/* Initialize TM and Hydra's PBS data structure */
err = tm_init(NULL, &tm_root);
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"tm_init() failed with TM error %d\n", err);
HYDT_bscd_pbs_sys->spawn_count = 0;
HYDT_bscd_pbs_sys->spawn_events = NULL;
HYDT_bscd_pbs_sys->task_id = NULL;
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -11,9 +11,7 @@
HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
int *control_fd)
{
int proxy_count, spawned_count;
int args_count, events_count;
int ierr, idx, spawned_hostID;
int proxy_count, i, args_count, events_count, err, idx, hostid;
struct HYD_proxy *proxy;
char *targs[HYD_NUM_TMP_STRINGS];
HYD_status status = HYD_SUCCESS;
......@@ -30,71 +28,34 @@ HYD_status HYDT_bscd_pbs_launch_procs(char **args, struct HYD_proxy *proxy_list,
proxy_count = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next)
proxy_count++;
HYDT_bscd_pbs_sys->size = proxy_count;
/* Check if number of proxies > number of processes in this PBS job */
if (proxy_count > (HYDT_bscd_pbs_sys->tm_root).tm_nnodes)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"Number of proxies(%d) > TM node count(%d)!\n", proxy_count,
(HYDT_bscd_pbs_sys->tm_root).tm_nnodes);
/* Duplicate the args in local copy, targs */
for (args_count = 0; args[args_count]; args_count++)
targs[args_count] = HYDU_strdup(args[args_count]);
/* Allocate memory for taskIDs[] and events[] arrays */
HYDU_MALLOC(HYDT_bscd_pbs_sys->taskIDs, tm_task_id *,
HYDT_bscd_pbs_sys->size * sizeof(tm_task_id), status);
HYDU_MALLOC(HYDT_bscd_pbs_sys->events, tm_event_t *,
HYDT_bscd_pbs_sys->size * sizeof(tm_event_t), status);
HYDU_MALLOC(HYDT_bscd_pbs_sys->task_id, tm_task_id *, proxy_count * sizeof(tm_task_id),
status);
HYDU_MALLOC(HYDT_bscd_pbs_sys->spawn_events, tm_event_t *,
proxy_count * sizeof(tm_event_t), status);
/* Spawn a process on each allocated node through tm_spawn() which
* returns a taskID for the process + a eventID for the spawning.
* The returned taskID won't be ready for access until tm_poll()
* has returned the corresponding eventID. */
spawned_count = 0;
spawned_hostID = 0;
for (proxy = proxy_list; proxy; proxy = proxy->next) {
targs[args_count] = HYDU_int_to_str(spawned_count);
ierr = tm_spawn(args_count + 1, targs, NULL, spawned_hostID,
HYDT_bscd_pbs_sys->taskIDs + spawned_count,
HYDT_bscd_pbs_sys->events + spawned_count);
if (HYDT_bsci_info.debug)
HYDU_dump(stdout, "PBS_DEBUG: %d, tm_spawn(hostID=%d,name=%s)\n",
spawned_count, spawned_hostID, proxy->node->hostname);
if (ierr != TM_SUCCESS) {
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_spawn() fails with TM err %d!\n", ierr);
}
spawned_hostID += proxy->node->core_count;
spawned_count++;
}
HYDT_bscd_pbs_sys->spawned_count = spawned_count;
* returns a taskID for the process + a eventID for the
* spawning. */
hostid = 0;
for (i = 0, proxy = proxy_list; proxy; proxy = proxy->next, i++) {
targs[args_count] = HYDU_int_to_str(i);
/* Poll the TM for the spawning eventID returned by tm_spawn() to
* determine if the spawned process has started. */
events_count = 0;
while (events_count < spawned_count) {
tm_event_t event = -1;
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(spawn_event) fails with TM err %d.\n", ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout,
"PBS_DEBUG: Event %d received, task %d has started.\n",
event, HYDT_bscd_pbs_sys->taskIDs[idx]);
}
events_count++;
break; /* break from for(idx<spawned_count) loop */
}
}
}
/* The task_id field is not filled in during tm_spawn(). The
* TM library just stores this address and fills it in when
* the event is completed by a call to tm_poll(). */
err = tm_spawn(args_count + 1, targs, NULL, hostid, &HYDT_bscd_pbs_sys->task_id[i],
&HYDT_bscd_pbs_sys->spawn_events[i]);
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"tm_spawn() failed with TM error %d\n", err);
hostid += proxy->node->core_count;
}
HYDT_bscd_pbs_sys->spawn_count = i;
fn_exit:
HYDU_FUNC_EXIT();
......
......@@ -10,75 +10,73 @@
HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
{
int time_elapsed;
int events_count, spawned_count;
int idx, ierr;
int time_elapsed, events_count, spawn_count, idx, err, *taskobits, poll_err;
struct timeval start_tval, curr_tval;
tm_event_t e;
tm_event_t *obit_events;
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();
/* Allocate memory for taskobits[] */
HYDU_MALLOC(HYDT_bscd_pbs_sys->taskobits, int *,
HYDT_bscd_pbs_sys->size * sizeof(int), status);
spawned_count = HYDT_bscd_pbs_sys->spawned_count;
HYDU_MALLOC(taskobits, int *, HYDT_bscd_pbs_sys->spawn_count * sizeof(int), status);
HYDU_MALLOC(obit_events, tm_event_t *, HYDT_bscd_pbs_sys->spawn_count * sizeof(tm_event_t),
status);
/*
* FIXME: We rely on gettimeofday here. This needs to detect the
for (idx = 0; idx < HYDT_bscd_pbs_sys->spawn_count; idx++)
obit_events[idx] = TM_NULL_EVENT;
/* FIXME: We rely on gettimeofday here. This needs to detect the
* timer type available and use that. Probably more of an MPL
* functionality than Hydra's.
*/
* functionality than Hydra's. */
gettimeofday(&start_tval, NULL);
/* Register with TM to be notified the obituary of the spawning process. */
for (idx = 0; idx < spawned_count; idx++) {
/*
* Get a TM event which will be returned by tm_poll() when
* the process labelled by taskID dies
*/
ierr = tm_obit(HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits + idx, HYDT_bscd_pbs_sys->events + idx);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_obit() fails with TM err=%d.\n", ierr);
if (HYDT_bscd_pbs_sys->events[idx] == TM_ERROR_EVENT)
HYDU_error_printf("tm_obit(Task %d) returns error.\n",
HYDT_bscd_pbs_sys->taskIDs[idx]);
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT)
HYDU_error_printf("Task %d already exits with status %d\n",
HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits[idx]);
}
spawn_count = HYDT_bscd_pbs_sys->spawn_count;
while (spawn_count) {
/* For each task, we need to first wait for it to be spawned,
* and then register its termination event */
err = tm_poll(TM_NULL_EVENT, &e, 0, &poll_err);
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"tm_poll(obit_event) failed with TM error %d\n", err);
/* Poll if the spawned process has exited */
events_count = 0;
/* Substract all the processes that have already exited */
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == TM_NULL_EVENT)
events_count++;
if (e != TM_NULL_EVENT) {
/* got some event; find the tm_spawn() call which created
* this event */
for (idx = 0; idx < HYDT_bscd_pbs_sys->spawn_count; idx++) {
if (HYDT_bscd_pbs_sys->spawn_events[idx] == e) {
/* got a spawn event (task_id[idx] is now valid);
* register this task for a termination event */
err = tm_obit(HYDT_bscd_pbs_sys->task_id[idx], &taskobits[idx],
&obit_events[idx]);
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"tm_obit() failed with TM error %d\n", err);
spawn_count--;
break;
}
else if (obit_events[idx] == e) {
/* got a task termination event */
obit_events[idx] = TM_NULL_EVENT;
}
/* Polling for the remaining alive processes till they all exit */
while (events_count < spawned_count) {
tm_event_t event = -1;
int poll_err;
ierr = tm_poll(TM_NULL_EVENT, &event, 0, &poll_err);
if (ierr != TM_SUCCESS)
HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR,
"tm_poll(obit_event) fails with err=%d.\n", ierr);
if (event != TM_NULL_EVENT) {
for (idx = 0; idx < spawned_count; idx++) {
if (HYDT_bscd_pbs_sys->events[idx] == event) {
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout,
"PBS_DEBUG: Event %d received, task %d exits with status %d.\n",
event, HYDT_bscd_pbs_sys->taskIDs[idx],
HYDT_bscd_pbs_sys->taskobits[idx]);
/*
* HYDU_error_printf("DEBUG: Event %d received, task %d exits with status %d.\n", event, HYDT_bscd_pbs_sys->taskIDs[idx], HYDT_bscd_pbs_sys->taskobits[idx]);
*/
}
}
}
/* All the tasks have been spawned (and some of them have terminated) */
/* Wait for all processes to terminate */
for (events_count = 0, idx = 0; idx < HYDT_bscd_pbs_sys->spawn_count; idx++)
if (obit_events[idx] == TM_NULL_EVENT)
events_count++;
break; /* break from for(idx<spawned_count) loop */
do {
err = tm_poll(TM_NULL_EVENT, &e, 0, &poll_err);
HYDU_ERR_CHKANDJUMP(status, err != TM_SUCCESS, HYD_INTERNAL_ERROR,
"tm_poll(obit_event) failed with TM error %d\n", err);
if (e != TM_NULL_EVENT) {
/* got some event; check if it is what we want */
for (idx = 0; idx < HYDT_bscd_pbs_sys->spawn_count; idx++) {
if (obit_events[idx] == e) {
events_count--;
break;
}
}
}
......@@ -92,13 +90,11 @@ HYD_status HYDT_bscd_pbs_wait_for_completion(int timeout)
goto fn_exit;
}
}
}
} while (events_count);
if (HYDT_bsci_info.debug) {
HYDU_dump(stdout, "\nPBS_DEBUG: Done with polling obit events!\n");
}
if (HYDT_bsci_info.debug)
HYDU_dump(stdout, "\nPBS_DEBUG: Done with polling obit events\n");
/* Loop till all sockets have closed */
fn_exit:
HYDU_FUNC_EXIT();
return status;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment