Commit a9853488 authored by Shane Snyder's avatar Shane Snyder

implemented and tested pmix failure notify

parent 345820bf
......@@ -183,11 +183,11 @@ int ssg_group_detach(
/**
* Obtains the caller's member ID in the given SSG group.
*
* @param[in] group_id SSG group ID
* @param[in] mid Corresponding Margo instance identifier
* @returns caller's group ID on success, SSG_MEMBER_ID_INVALID otherwise
*/
ssg_member_id_t ssg_get_group_self_id(
ssg_group_id_t group_id);
ssg_member_id_t ssg_get_self_id(
margo_instance_id mid);
/**
* Obtains the size of a given SSG group.
......@@ -205,7 +205,7 @@ int ssg_get_group_size(
* @param[in] member_id SSG group member ID
* @returns HG address of given group member on success, HG_ADDR_NULL otherwise
*/
hg_addr_t ssg_get_addr(
hg_addr_t ssg_get_group_addr(
ssg_group_id_t group_id,
ssg_member_id_t member_id);
......
......@@ -6,6 +6,8 @@
#pragma once
#include "ssg-config.h"
#include <stdint.h>
#include <inttypes.h>
......@@ -25,34 +27,12 @@ extern "C" {
#define SSG_MAGIC_NR 17321588
#define SSG_GET_SELF_ADDR_STR(__mid, __addr_str) do { \
hg_addr_t __self_addr; \
hg_size_t __size; \
__addr_str = NULL; \
if (margo_addr_self(__mid, &__self_addr) != HG_SUCCESS) break; \
if (margo_addr_to_string(__mid, NULL, &__size, __self_addr) != HG_SUCCESS) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if ((__addr_str = malloc(__size)) == NULL) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if (margo_addr_to_string(__mid, __addr_str, &__size, __self_addr) != HG_SUCCESS) { \
free(__addr_str); \
__addr_str = NULL; \
margo_addr_free(__mid, __self_addr); \
break; \
} \
margo_addr_free(__mid, __self_addr); \
} while(0)
/* debug printing macro for SSG */
#ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
fprintf(__g->dbg_log, "[%.6lf] %20"PRIu64" (%s): " __fmt, __now, \
__g->self_id, __g->name, ## __VA_ARGS__); \
fprintf(__g->dbg_log, "%.6lf %20"PRIu64" (%s): " __fmt, __now, \
__g->ssg_inst->self_id, __g->name, ## __VA_ARGS__); \
fflush(__g->dbg_log); \
} while(0)
#else
......@@ -62,6 +42,19 @@ extern "C" {
/* SSG internal dataypes */
typedef struct ssg_instance
{
margo_instance_id mid;
char *self_addr_str;
ssg_member_id_t self_id;
struct ssg_group *group_table;
struct ssg_attached_group *attached_group_table;
#ifdef SSG_HAVE_PMIX
size_t pmix_failure_evhdlr_ref;
#endif
ABT_rwlock lock;
} ssg_instance_t;
/* TODO: associate a version number with a descriptor? */
typedef struct ssg_group_descriptor
{
......@@ -72,6 +65,13 @@ typedef struct ssg_group_descriptor
int ref_count;
} ssg_group_descriptor_t;
enum ssg_group_descriptor_owner_status
{
SSG_OWNER_IS_UNASSOCIATED = 0,
SSG_OWNER_IS_MEMBER,
SSG_OWNER_IS_ATTACHER
};
typedef struct ssg_member_state
{
ssg_member_id_t id;
......@@ -81,16 +81,6 @@ typedef struct ssg_member_state
UT_hash_handle hh;
} ssg_member_state_t;
typedef struct ssg_member_update
{
ssg_member_update_type_t type;
union
{
char *member_addr_str;
ssg_member_id_t member_id;
} u;
} ssg_member_update_t;
typedef struct ssg_group_view
{
unsigned int size;
......@@ -100,7 +90,7 @@ typedef struct ssg_group_view
typedef struct ssg_group
{
char *name;
ssg_member_id_t self_id;
ssg_instance_t *ssg_inst;
ssg_group_view_t view;
ssg_member_state_t *dead_members;
ssg_group_descriptor_t *descriptor;
......@@ -117,26 +107,22 @@ typedef struct ssg_group
typedef struct ssg_attached_group
{
char *name;
ssg_instance_t *ssg_inst;
ssg_group_view_t view;
ssg_group_descriptor_t *descriptor;
ABT_rwlock lock;
UT_hash_handle hh;
} ssg_attached_group_t;
typedef struct ssg_instance
{
margo_instance_id mid;
ssg_group_t *group_table;
ssg_attached_group_t *attached_group_table;
ABT_rwlock lock;
} ssg_instance_t;
enum ssg_group_descriptor_owner_status
typedef struct ssg_member_update
{
SSG_OWNER_IS_UNASSOCIATED = 0,
SSG_OWNER_IS_MEMBER,
SSG_OWNER_IS_ATTACHER
};
ssg_member_update_type_t type;
union
{
char *member_addr_str;
ssg_member_id_t member_id;
} u;
} ssg_member_update_t;
/* SSG internal function prototypes */
......
......@@ -105,7 +105,6 @@ int ssg_group_join_send(
hg_bulk_t bulk_handle = HG_BULK_NULL;
void *tmp_view_buf = NULL, *b;
hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
char *self_addr_str = NULL;
ssg_group_join_request_t join_req;
ssg_group_join_response_t join_resp;
hg_return_t hret;
......@@ -119,9 +118,6 @@ int ssg_group_join_send(
ssg_group_join_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (!self_addr_str) goto fini;
/* allocate a buffer to try to store the group view in */
/* NOTE: We don't know if this buffer is big enough to store the complete
* view. If the buffer is not large enough, the group member we are
......@@ -137,7 +133,7 @@ int ssg_group_join_send(
/* send a join request to the given group member address */
/* XXX is the whole descriptor really needed? */
memcpy(&join_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
join_req.addr_str = self_addr_str;
join_req.addr_str = ssg_inst->self_addr_str;
join_req.bulk_handle = bulk_handle;
hret = margo_forward(handle, &join_req);
if (hret != HG_SUCCESS) goto fini;
......@@ -197,7 +193,6 @@ fini:
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
free(tmp_view_buf);
free(self_addr_str);
return sret;
}
......@@ -561,7 +556,6 @@ DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
static int ssg_group_serialize(
ssg_group_t *g, void **buf, hg_size_t *buf_size)
{
char *self_addr_str;
ssg_member_state_t *member_state, *tmp;
hg_size_t group_buf_size = 0;
void *group_buf;
......@@ -570,11 +564,8 @@ static int ssg_group_serialize(
*buf = NULL;
*buf_size = 0;
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (!self_addr_str) return SSG_FAILURE;
/* first determine size */
group_buf_size = strlen(self_addr_str) + 1;
group_buf_size = strlen(ssg_inst->self_addr_str) + 1;
HASH_ITER(hh, g->view.member_map, member_state, tmp)
{
group_buf_size += strlen(member_state->addr_str) + 1;
......@@ -583,13 +574,12 @@ static int ssg_group_serialize(
group_buf = malloc(group_buf_size);
if(!group_buf)
{
free(self_addr_str);
return SSG_FAILURE;
}
buf_p = group_buf;
strcpy(buf_p, self_addr_str);
buf_p += strlen(self_addr_str) + 1;
strcpy(buf_p, ssg_inst->self_addr_str);
buf_p += strlen(ssg_inst->self_addr_str) + 1;
HASH_ITER(hh, g->view.member_map, member_state, tmp)
{
str_p = member_state->addr_str;
......@@ -599,7 +589,6 @@ static int ssg_group_serialize(
*buf = group_buf;
*buf_size = group_buf_size;
free(self_addr_str);
return SSG_SUCCESS;
}
......
This diff is collapsed.
......@@ -24,7 +24,7 @@ extern "C" {
#define SWIM_DEF_SUBGROUP_SIZE 2
#define SWIM_MAX_SUBGROUP_SIZE 5
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
#define SWIM_MAX_PIGGYBACK_TX_COUNT 5
typedef struct swim_ping_target_list
{
......
......@@ -116,8 +116,11 @@ void swim_dping_req_send_ult(
{
ssg_group_t *group = (ssg_group_t *)t_arg;
assert(group != NULL);
assert(group->swim_ctx != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM dping req send error -- invalid group state\n");
return;
}
/* send the dping req, ignoring retval since we can't return it from a ULT */
swim_dping_req_send(group, group->swim_ctx->dping_target_id,
......@@ -175,8 +178,11 @@ static void swim_dping_req_recv_ult(
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_req_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM dping req recv error -- invalid group state\n");
return;
}
hret = margo_get_input(handle, &dping_req);
if(hret != HG_SUCCESS)
......@@ -234,8 +240,11 @@ static void swim_dping_ack_recv_ult(
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_ack_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM dping ack recv error -- invalid group state\n");
return;
}
hret = margo_get_input(handle, &dping_ack);
if(hret != HG_SUCCESS)
......@@ -337,9 +346,12 @@ void swim_iping_req_send_ult(
swim_iping_req_t iping_req;
hg_return_t hret;
assert(group != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM iping req send error -- invalid group state\n");
return;
}
swim_ctx = group->swim_ctx;
assert(swim_ctx != NULL);
ABT_rwlock_wrlock(group->lock);
iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx];
......@@ -387,8 +399,11 @@ static void swim_iping_req_recv_ult(hg_handle_t handle)
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_iping_req_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM iping req recv error -- invalid group state\n");
return;
}
hret = margo_get_input(handle, &iping_req);
if(hret != HG_SUCCESS)
......@@ -453,8 +468,11 @@ static void swim_iping_ack_recv_ult(hg_handle_t handle)
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_iping_ack_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM iping ack recv error -- invalid group state\n");
return;
}
hret = margo_get_input(handle, &iping_ack);
if(hret != HG_SUCCESS)
......@@ -493,7 +511,7 @@ static void swim_pack_message(ssg_group_t *group, swim_message_t *msg)
memset(msg, 0, sizeof(*msg));
/* fill in self information */
msg->source_id = group->self_id;
msg->source_id = group->ssg_inst->self_id;
msg->source_inc_nr = group->swim_ctx->self_inc_nr;
/* piggyback SWIM & SSG updates on the message */
......@@ -606,6 +624,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
}
for(i = 0; i < msg->swim_pb_buf_count; i++)
{
memset(&(msg->swim_pb_buf[i]), 0, sizeof(msg->swim_pb_buf[i]));
hret = hg_proc_swim_member_update_t(proc, &(msg->swim_pb_buf[i]));
if(hret != HG_SUCCESS)
{
......
......@@ -349,6 +349,7 @@ static void swim_get_dping_target(
ssg_member_state_t *tmp_ms;
hg_return_t hret;
*target_id = SSG_MEMBER_ID_INVALID;
ABT_rwlock_wrlock(group->lock);
/* find dping target */
......@@ -873,12 +874,12 @@ void swim_apply_member_updates(
{
case SWIM_MEMBER_ALIVE:
/* ignore alive updates for self */
if(updates[i].id != group->self_id)
if(updates[i].id != group->ssg_inst->self_id)
swim_process_alive_member_update(group, updates[i].id,
updates[i].state.inc_nr);
break;
case SWIM_MEMBER_SUSPECT:
if(updates[i].id == group->self_id)
if(updates[i].id == group->ssg_inst->self_id)
{
/* increment our incarnation number if we are suspected
* in the current incarnation
......@@ -898,7 +899,7 @@ void swim_apply_member_updates(
break;
case SWIM_MEMBER_DEAD:
/* if we get an update that we are dead, just shut down */
if(updates[i].id == group->self_id)
if(updates[i].id == group->ssg_inst->self_id)
{
SSG_DEBUG(group, "SWIM self confirmed DEAD (inc_nr=%u)\n",
updates[i].state.inc_nr);
......@@ -912,7 +913,7 @@ void swim_apply_member_updates(
}
break;
default:
fprintf(stderr, "Error: invalid SWIM member update\n");
fprintf(stderr, "Error: invalid SWIM member update [%lu,%d]\n", group->ssg_inst->self_id,updates[i].state.status);
break;
}
}
......@@ -952,6 +953,7 @@ int swim_apply_ssg_member_update(
break;
case SSG_MEMBER_LEFT:
case SSG_MEMBER_DIED:
/* just mark as dead, this member will be cleaned from ping target
* list on the next re-shuffle
*/
......
......@@ -228,7 +228,7 @@ int main(int argc, char **argv)
assert(ssg_get_group_size(gid) == 2);
self = ssg_get_group_self_id(gid);
self = ssg_get_self_id(gid);
if(self == 1)
{
......@@ -538,7 +538,7 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
for(i=0; i<(g_buffer_size/sizeof(i)); i++)
((hg_size_t*)buffer)[i] = i;
target_addr = ssg_get_addr(gid, target);
target_addr = ssg_get_group_addr(gid, target);
assert(target_addr != HG_ADDR_NULL);
ret = margo_create(mid, target_addr, id, &handle);
......
......@@ -182,7 +182,7 @@ int main(int argc, char **argv)
assert(ssg_get_group_size(gid) == 2);
self = ssg_get_group_self_id(gid);
self = ssg_get_self_id(gid);
#if 0
printf("MPI rank %d has SSG ID %lu\n", rank, self);
#endif
......@@ -332,7 +332,7 @@ static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
int ret;
double tm1, tm2;
target_addr = ssg_get_addr(gid, target);
target_addr = ssg_get_group_addr(gid, target);
assert(target_addr != HG_ADDR_NULL);
ret = margo_create(mid, target_addr, id, &handle);
......
......@@ -191,7 +191,7 @@ int main(int argc, char *argv[])
margo_thread_sleep(mid, opts.shutdown_time * 1000.0);
/* get my group id and the size of the group */
my_id = ssg_get_group_self_id(g_id);
my_id = ssg_get_self_id(mid);
DIE_IF(my_id == SSG_MEMBER_ID_INVALID, "ssg_get_group_self_id");
group_size = ssg_get_group_size(g_id);
DIE_IF(group_size == 0, "ssg_get_group_size");
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment