Commit 6d299583 authored by Shane Snyder's avatar Shane Snyder

fix pmix to support multiple mids

parent 0cdb34d5
......@@ -121,7 +121,6 @@ int ssg_init()
ABT_rwlock_create(&ssg_rt->lock);
#ifdef SSG_HAVE_PMIX
/* XXX config switch for this functionality */
/* use PMIx event registrations to inform us of terminated/aborted procs */
pmix_status_t err_codes[2] = {PMIX_PROC_TERMINATED, PMIX_ERR_PROC_ABORTED};
PMIx_Register_event_handler(err_codes, 2, NULL, 0,
......@@ -147,9 +146,8 @@ int ssg_finalize()
ABT_rwlock_wrlock(ssg_rt->lock);
#ifdef SSG_HAVE_PMIX
// XXX broken
//if (ssg_rt->pmix_failure_evhdlr_ref)
// PMIx_Deregister_event_handler(ssg_rt->pmix_failure_evhdlr_ref, NULL, NULL);
if (ssg_rt->pmix_failure_evhdlr_ref)
PMIx_Deregister_event_handler(ssg_rt->pmix_failure_evhdlr_ref, NULL, NULL);
#endif
/* destroy all active groups */
......@@ -385,15 +383,19 @@ ssg_group_id_t ssg_group_create_pmix(
{
ssg_mid_state_t *mid_state;
pmix_proc_t tmp_proc;
pmix_data_array_t my_ids_array, *tmp_id_array_ptr;
pmix_value_t value;
pmix_value_t *val_p;
pmix_value_t *addr_vals = NULL;
unsigned int nprocs;
unsigned int nprocs = 0;
char key[512];
pmix_info_t *info;
bool flag;
const char **addr_strs = NULL;
unsigned int n;
size_t i;
int match;
ssg_member_id_t *ids;
pmix_status_t ret;
ssg_group_id_t g_id = SSG_GROUP_ID_INVALID;
......@@ -402,25 +404,72 @@ ssg_group_id_t ssg_group_create_pmix(
mid_state = ssg_retrieve_mid_state(mid);
if(!mid_state) goto fini;
/* XXX this is broken with multiple mids */
#if 0
/* XXX config switch for this functionality */
ret = PMIx_Get(&tmp_proc, WHAT,, , &val_p)
if (WHAT)
/* we need to store a mapping of PMIx ranks to SSG member IDs so that
* if we later receive notice of a PMIx rank failure we know how to
* map to affected SSG group members
*/
snprintf(key, 512, "ssg-%s-%d-id", proc.nspace, proc.rank);
PMIX_INFO_CREATE(info, 1);
flag = true;
PMIX_INFO_LOAD(info, PMIX_IMMEDIATE, &flag, PMIX_BOOL);
ret = PMIx_Get(&proc, key, info, 1, &val_p);
PMIX_INFO_FREE(info, 1);
if (ret != PMIX_SUCCESS)
{
/* exchange information needed to map PMIx ranks to SSG member IDs */
snprintf(key, 512, "ssg-%s-%d-id", proc.nspace, proc.rank);
PMIX_VALUE_LOAD(&value, &mid_state->self_id, PMIX_UINT64);
/* no key present, add the rank mapping for the first time */
my_ids_array.type = PMIX_UINT64;
my_ids_array.size = 1;
my_ids_array.array = &mid_state->self_id;
PMIX_VALUE_LOAD(&value, &my_ids_array, PMIX_DATA_ARRAY);
ret = PMIx_Put(PMIX_GLOBAL, key, &value);
if (ret != PMIX_SUCCESS)
fprintf(stderr, "Warning: unable to store PMIx rank->ID mapping for"\
"SSG member %lu\n", mid_state->self_id);
}
else
{
/* rank mapping found, see if we need to add this self ID to it */
tmp_id_array_ptr = val_p->data.darray;
if (tmp_id_array_ptr && (tmp_id_array_ptr->type == PMIX_UINT64))
{
match = 0;
ids = (ssg_member_id_t *)tmp_id_array_ptr->array;
for (i = 0; i < tmp_id_array_ptr->size; i++)
{
if (ids[i] == mid_state->self_id)
{
match = 1;
fprintf(stderr, "FOUND OLD MID\n");
break;
}
}
if (!match)
{
/* update existing mapping to include this self ID */
ids = malloc((tmp_id_array_ptr->size + 1) * sizeof(*ids));
if (!ids) goto fini;
memcpy(ids, tmp_id_array_ptr->array,
tmp_id_array_ptr->size * sizeof(*ids));
ids[tmp_id_array_ptr->size + 1] = mid_state->self_id;
my_ids_array.type = PMIX_UINT64;
my_ids_array.size = tmp_id_array_ptr->size + 1;
my_ids_array.array = ids;
PMIX_VALUE_LOAD(&value, &my_ids_array, PMIX_DATA_ARRAY);
ret = PMIx_Put(PMIX_GLOBAL, key, &value);
free(ids);
fprintf(stderr, "ADDED NEW MID\n");
if (ret != PMIX_SUCCESS)
fprintf(stderr, "Warning: unable to store PMIx rank->ID mapping for"\
"SSG member %lu\n", mid_state->self_id);
}
}
else
{
fprintf(stderr, "Warning: skipping PMIx event notification registration -- "\
"Unable to put PMIx rank mapping\n");
PMIx_Deregister_event_handler(ssg_inst->pmix_failure_evhdlr_ref, NULL, NULL);
fprintf(stderr, "Warning: unexpected format for PMIx rank->ID mapping\n");
}
PMIX_VALUE_RELEASE(val_p);
}
PMIX_VALUE_RELEASE(val_p);
#endif
/* XXX note we are assuming every process in the job wants to join this group... */
/* get the total nprocs in the job */
......@@ -445,8 +494,8 @@ ssg_group_id_t ssg_group_create_pmix(
flag = true;
PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL);
ret = PMIx_Fence(&proc, 1, info, 1);
if (ret != PMIX_SUCCESS) goto fini;
PMIX_INFO_FREE(info, 1);
if (ret != PMIX_SUCCESS) goto fini;
addr_strs = malloc(nprocs * sizeof(*addr_strs));
if (addr_strs == NULL) goto fini;
......@@ -1961,15 +2010,18 @@ static int ssg_get_group_member_rank_internal(
}
#ifdef SSG_HAVE_PMIX
// XXX BROKEN
void ssg_pmix_proc_failure_notify_fn(
size_t evhdlr_registration_id, pmix_status_t status, const pmix_proc_t *source,
pmix_info_t info[], size_t ninfo, pmix_info_t results[], size_t nresults,
pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata)
{
char key[512];
pmix_value_t ssg_id_val;
bool flag;
pmix_info_t *get_info;
pmix_value_t *val_p;
pmix_data_array_t *id_array_ptr;
ssg_member_id_t *ids;
size_t i;
pmix_status_t ret;
ssg_group_descriptor_t *g_desc, *g_desc_tmp;
ssg_member_update_t fail_update;
......@@ -1977,9 +2029,11 @@ void ssg_pmix_proc_failure_notify_fn(
assert(status == PMIX_PROC_TERMINATED || status == PMIX_ERR_PROC_ABORTED);
snprintf(key, 512, "ssg-%s-%d-id", source->nspace, source->rank);
val_p = &ssg_id_val;
PMIX_VALUE_CONSTRUCT(val_p);
ret = PMIx_Get(source, key, NULL, 0, &val_p);
PMIX_INFO_CREATE(get_info, 1);
flag = true;
PMIX_INFO_LOAD(get_info, PMIX_IMMEDIATE, &flag, PMIX_BOOL);
ret = PMIx_Get(source, key, get_info, 1, &val_p);
PMIX_INFO_FREE(get_info, 1);
if (ret != PMIX_SUCCESS)
{
fprintf(stderr, "Warning: unable to retrieve PMIx rank mapping for rank %d\n",
......@@ -1987,16 +2041,33 @@ void ssg_pmix_proc_failure_notify_fn(
}
else
{
/* remove this member from any group its a member of */
fail_update.type = SSG_MEMBER_DIED;
fail_update.u.member_id = val_p->data.uint64;
// XXX LOCKING
HASH_ITER(hh, ssg_inst->g_desc_table, g_desc, g_desc_tmp)
id_array_ptr = val_p->data.darray;
if (id_array_ptr && (id_array_ptr->type == PMIX_UINT64))
{
ids = (ssg_member_id_t *)id_array_ptr->array;
fail_update.type = SSG_MEMBER_DIED;
/* iterate all SSG member IDs associated with the failed PMIx rank */
ABT_rwlock_rdlock(ssg_rt->lock);
for (i = 0; i < id_array_ptr->size; i++)
{
/* remove this member from any group its a member of */
fail_update.u.member_id = ids[i];
HASH_ITER(hh, ssg_rt->g_desc_table, g_desc, g_desc_tmp)
{
SSG_DEBUG(g_desc->g_data.g, "RECEIVED FAIL UPDATE FOR MEMBER %lu\n",
fail_update.u.member_id);
ssg_apply_member_updates(g_desc->g_data.g, &fail_update, 1);
}
}
ABT_rwlock_unlock(ssg_rt->lock);
}
else
{
SSG_DEBUG(g_desc->g_data.g, "RECEIVED FAIL UPDATE FOR MEMBER %lu\n",
fail_update.u.member_id);
ssg_apply_member_updates(g_desc->g_data.g, &fail_update, 1);
fprintf(stderr, "Warning: unexpected format for PMIx rank->ID mapping\n");
}
PMIX_VALUE_RELEASE(val_p);
}
/* execute PMIx event notification callback */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment