Commit 96c88048 authored by Shane Snyder's avatar Shane Snyder

port join/leave over to recent changes

parent 3327958e
......@@ -132,16 +132,16 @@ int ssg_group_destroy(
/**
* Adds the calling process to an SSG group.
*
* @param[in] in_group_id Input SSG group ID
* @param[in] group_id Input SSG group ID
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: Use the returned group ID to refer to the group, as the input group ID
* becomes stale after the join is completed.
*/
ssg_group_id_t ssg_group_join(
ssg_group_id_t in_group_id,
int ssg_group_join(
ssg_group_id_t group_id,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
......@@ -249,6 +249,7 @@ void ssg_group_id_deserialize(
*
* @param[in] file_name File to store the group ID in
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_id_store(
const char * file_name,
......@@ -259,6 +260,7 @@ int ssg_group_id_store(
*
* @param[in] file_name File to store the group ID in
* @param[out] group_id_p Pointer to store group identifier in
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_id_load(
const char * file_name,
......
......@@ -90,7 +90,6 @@ void ssg_register_rpcs()
return;
}
#if 0
/* ssg_group_join_send
*
*
......@@ -197,7 +196,6 @@ fini:
return sret;
}
#endif
static void ssg_group_join_recv_ult(
hg_handle_t handle)
......@@ -225,11 +223,14 @@ static void ssg_group_join_recv_ult(
if (hret != HG_SUCCESS) goto fini;
view_size_requested = margo_bulk_get_size(join_req.bulk_handle);
ABT_rwlock_rdlock(ssg_inst->lock);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->g_desc_table, &join_req.group_descriptor.g_id,
sizeof(uint64_t), g_desc);
if (!g_desc)
{
ABT_rwlock_unlock(ssg_inst->lock);
margo_free_input(handle, &join_req);
goto fini;
}
......@@ -238,10 +239,13 @@ static void ssg_group_join_recv_ult(
sret = ssg_group_serialize(g_desc->g_data.g, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
ABT_rwlock_unlock(ssg_inst->lock);
margo_free_input(handle, &join_req);
goto fini;
}
ABT_rwlock_unlock(ssg_inst->lock);
if (view_size_requested >= view_buf_size)
{
/* if joiner's buf is large enough, transfer the view */
......@@ -286,7 +290,6 @@ fini:
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult)
#if 0
/* ssg_group_leave_send
*
*
......@@ -323,7 +326,6 @@ fini:
return sret;
}
#endif
static void ssg_group_leave_recv_ult(
hg_handle_t handle)
......@@ -345,16 +347,21 @@ static void ssg_group_leave_recv_ult(
hret = margo_get_input(handle, &leave_req);
if (hret != HG_SUCCESS) goto fini;
ABT_rwlock_rdlock(ssg_inst->lock);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->g_desc_table, &leave_req.group_descriptor.g_id,
sizeof(uint64_t), g_desc);
if (!g_desc)
{
ABT_rwlock_unlock(ssg_inst->lock);
margo_free_input(handle, &leave_req);
goto fini;
}
assert(g_desc->owner_status == SSG_OWNER_IS_MEMBER);
ABT_rwlock_unlock(ssg_inst->lock);
/* apply group leave locally */
leave_update.type = SSG_MEMBER_LEFT;
leave_update.u.member_id = leave_req.member_id;
......@@ -407,9 +414,9 @@ int ssg_group_observe_send(
ssg_group_observe_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
/* allocate a buffer to try to store the group view in */
/* allocate a buffer of the given size to try to store the group view in */
/* NOTE: We don't know if this buffer is big enough to store the complete
* view. If the buffer is not large enough, the group member we are
* view. If the buffers is not large enough, the group member we are
* sending to will send a NACK indicating the necessary buffer size
*/
tmp_view_buf = malloc(tmp_view_buf_size);
......@@ -546,7 +553,7 @@ static void ssg_group_observe_recv_ult(
/* set the response and send back */
observe_resp.group_name = g_desc->g_data.g->name;
observe_resp.group_size = (int)g_desc->g_data.g->view.size;
observe_resp.group_size = (int)g_desc->g_data.g->view.size+1;
observe_resp.view_buf_size = view_buf_size;
margo_respond(handle, &observe_resp);
......
......@@ -151,7 +151,6 @@ int ssg_finalize()
if (!ssg_inst)
return SSG_FAILURE;
/* destroy all active groups */
ABT_rwlock_wrlock(ssg_inst->lock);
HASH_ITER(hh, ssg_inst->g_desc_table, g_desc, g_desc_tmp)
......@@ -487,45 +486,63 @@ int ssg_group_destroy(
return SSG_SUCCESS;
}
#if 0
ssg_group_id_t ssg_group_join(
ssg_group_id_t in_group_id,
int ssg_group_join(
ssg_group_id_t group_id,
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
ssg_group_descriptor_t *g_desc;
hg_addr_t group_target_addr = HG_ADDR_NULL;
char *group_name = NULL;
int group_size;
void *view_buf = NULL;
const char **addr_strs = NULL;
ssg_group_id_t create_g_id;
hg_return_t hret;
int sret;
ssg_group_t *g = NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
int sret = SSG_FAILURE;
if (!ssg_inst || group_id == SSG_GROUP_ID_INVALID) goto fini;
ABT_rwlock_wrlock(ssg_inst->lock);
if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini;
/* find the group structure to join */
HASH_FIND(hh, ssg_inst->g_desc_table, &group_id, sizeof(ssg_group_id_t), g_desc);
if (!g_desc)
{
fprintf(stderr, "Error: SSG unable to find expected group ID\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER)
if (g_desc->owner_status == SSG_OWNER_IS_MEMBER)
{
fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
else if (in_group_descriptor->owner_status == SSG_OWNER_IS_OBSERVER)
else if (g_desc->owner_status == SSG_OWNER_IS_OBSERVER)
{
fprintf(stderr, "Error: SSG unable to join a group it is an observer of\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
/* remove the descriptor since we re-add it as part of group creation */
HASH_DELETE(hh, ssg_inst->g_desc_table, g_desc);
ABT_rwlock_unlock(ssg_inst->lock);
/* lookup the address of the target group member in the GID */
hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str,
&group_target_addr);
hret = margo_addr_lookup(ssg_inst->mid, g_desc->addr_str, &group_target_addr);
if (hret != HG_SUCCESS) goto fini;
sret = ssg_group_join_send(in_group_descriptor, group_target_addr,
sret = ssg_group_join_send(g_desc, group_target_addr,
&group_name, &group_size, &view_buf);
if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini;
/* free old descriptor */
ssg_group_descriptor_free(g_desc);
/* set up address string array for all group members */
addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size);
if (!addr_strs) goto fini;
......@@ -535,11 +552,13 @@ ssg_group_id_t ssg_group_join(
if(!addr_strs) goto fini;
addr_strs[group_size++] = ssg_inst->self_addr_str;
g = ssg_group_create_internal(group_name, addr_strs, group_size,
create_g_id = ssg_group_create_internal(group_name, addr_strs, group_size,
update_cb, update_cb_dat);
if (g)
if (create_g_id != SSG_GROUP_ID_INVALID)
{
g_id = (ssg_group_id_t)g->descriptor;
assert(create_g_id == group_id);
sret = SSG_SUCCESS;
/* don't free on success */
group_name = NULL;
......@@ -552,37 +571,43 @@ fini:
free(view_buf);
free(group_name);
return g_id;
return sret;
}
int ssg_group_leave(
ssg_group_id_t group_id)
{
ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
ssg_group_t *g = NULL;
ssg_group_descriptor_t *g_desc;
hg_addr_t group_target_addr = HG_ADDR_NULL;
hg_return_t hret;
int sret = SSG_FAILURE;
if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini;
if (!ssg_inst || group_id == SSG_GROUP_ID_INVALID) goto fini;
if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
ABT_rwlock_wrlock(ssg_inst->lock);
/* find the group structure to leave */
HASH_FIND(hh, ssg_inst->g_desc_table, &group_id, sizeof(ssg_group_id_t), g_desc);
if (!g_desc)
{
fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
ABT_rwlock_unlock(ssg_inst->lock);
fprintf(stderr, "Error: SSG unable to find expected group ID\n");
goto fini;
}
ABT_rwlock_rdlock(ssg_inst->lock);
HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
sizeof(uint64_t), g);
if (!g)
if (g_desc->owner_status != SSG_OWNER_IS_MEMBER)
{
fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
/* remove the descriptor */
HASH_DELETE(hh, ssg_inst->g_desc_table, g_desc);
/* send the leave req to the first member in the view */
hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr);
hret = margo_addr_dup(ssg_inst->mid, g_desc->g_data.g->view.member_map->addr,
&group_target_addr);
if (hret != HG_SUCCESS)
{
ABT_rwlock_unlock(ssg_inst->lock);
......@@ -590,25 +615,16 @@ int ssg_group_leave(
}
ABT_rwlock_unlock(ssg_inst->lock);
sret = ssg_group_leave_send(group_descriptor, ssg_inst->self_id, group_target_addr);
sret = ssg_group_leave_send(g_desc, ssg_inst->self_id, group_target_addr);
if (sret != SSG_SUCCESS) goto fini;
/* at least one group member knows of the leave request -- safe to
* shutdown the group locally
*/
/* re-lookup the group as we don't hold the lock while sending the leave req */
ABT_rwlock_wrlock(ssg_inst->lock);
HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
sizeof(uint64_t), g);
if (g)
{
HASH_DELETE(hh, ssg_inst->group_table, g);
ABT_rwlock_unlock(ssg_inst->lock);
ssg_group_destroy_internal(g);
}
else
ABT_rwlock_unlock(ssg_inst->lock);
/* destroy group and free old descriptor */
ssg_group_destroy_internal(g_desc->g_data.g);
ssg_group_descriptor_free(g_desc);
sret = SSG_SUCCESS;
......@@ -618,7 +634,6 @@ fini:
return sret;
}
#endif
int ssg_group_observe(
ssg_group_id_t group_id)
......@@ -639,19 +654,21 @@ int ssg_group_observe(
HASH_FIND(hh, ssg_inst->g_desc_table, &group_id, sizeof(ssg_group_id_t), g_desc);
if (!g_desc)
{
ABT_rwlock_unlock(ssg_inst->lock);
fprintf(stderr, "Error: SSG unable to find expected group ID\n");
ABT_rwlock_unlock(ssg_inst->lock);
return SSG_FAILURE;
}
if (g_desc->owner_status == SSG_OWNER_IS_MEMBER)
{
fprintf(stderr, "Error: SSG unable to observe a group it is a member of\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
else if (g_desc->owner_status == SSG_OWNER_IS_OBSERVER)
{
fprintf(stderr, "Error: SSG unable to observe a group it is already observing\n");
ABT_rwlock_unlock(ssg_inst->lock);
goto fini;
}
......@@ -672,6 +689,7 @@ int ssg_group_observe(
if (!og) goto fini;
memset(og, 0, sizeof(*og));
og->name = strdup(group_name);
ABT_rwlock_create(&og->lock);
/* create the view for the group */
sret = ssg_group_view_create(addr_strs, group_size, NULL, og->lock, &og->view);
......@@ -681,7 +699,6 @@ int ssg_group_observe(
ABT_rwlock_wrlock(ssg_inst->lock);
g_desc->owner_status = SSG_OWNER_IS_OBSERVER;
g_desc->g_data.og = og;
HASH_ADD(hh, ssg_inst->g_desc_table, g_id, sizeof(ssg_group_id_t), g_desc);
ABT_rwlock_unlock(ssg_inst->lock);
sret = SSG_SUCCESS;
......@@ -711,15 +728,15 @@ int ssg_group_unobserve(
HASH_FIND(hh, ssg_inst->g_desc_table, &group_id, sizeof(ssg_group_id_t), g_desc);
if (!g_desc)
{
ABT_rwlock_unlock(ssg_inst->lock);
fprintf(stderr, "Error: SSG unable to find expected group ID\n");
ABT_rwlock_unlock(ssg_inst->lock);
return SSG_FAILURE;
}
if (g_desc->owner_status != SSG_OWNER_IS_OBSERVER)
{
ABT_rwlock_unlock(ssg_inst->lock);
fprintf(stderr, "Error: SSG unable to unobserve group that was never observed\n");
ABT_rwlock_unlock(ssg_inst->lock);
return SSG_FAILURE;
}
HASH_DELETE(hh, ssg_inst->g_desc_table, g_desc);
......@@ -1109,7 +1126,7 @@ void ssg_group_dump(
ABT_rwlock_rdlock(g_desc->g_data.og->lock);
group_view = &g_desc->g_data.og->view;
group_lock = g_desc->g_data.og->lock;
group_size = g_desc->g_data.og->view.size + 1;
group_size = g_desc->g_data.og->view.size;
group_name = g_desc->g_data.og->name;
strcpy(group_role, "observer");
ABT_rwlock_unlock(g_desc->g_data.og->lock);
......
......@@ -3,6 +3,7 @@ TESTS_ENVIRONMENT += \
check_PROGRAMS += \
tests/ssg-launch-group \
tests/ssg-test-observe \
tests/ssg-join-leave-group
EXTRA_DIST += \
......
......@@ -224,11 +224,11 @@ int main(int argc, char **argv)
ret = ssg_init(mid);
assert(ret == 0);
gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
assert(gid != SSG_GROUP_ID_NULL);
assert(gid != SSG_GROUP_ID_INVALID);
assert(ssg_get_group_size(gid) == 2);
self = ssg_get_self_id(gid);
self = ssg_get_self_id(mid);
if(self == 1)
{
......
......@@ -178,11 +178,11 @@ int main(int argc, char **argv)
ret = ssg_init(mid);
assert(ret == 0);
gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
assert(gid != SSG_GROUP_ID_NULL);
assert(gid != SSG_GROUP_ID_INVALID);
assert(ssg_get_group_size(gid) == 2);
self = ssg_get_self_id(gid);
self = ssg_get_self_id(mid);
#if 0
printf("MPI rank %d has SSG ID %lu\n", rank, self);
#endif
......
......@@ -112,8 +112,7 @@ int main(int argc, char *argv[])
{
struct group_join_leave_opts opts;
margo_instance_id mid = MARGO_INSTANCE_NULL;
ssg_group_id_t in_g_id = SSG_GROUP_ID_NULL;
ssg_group_id_t out_g_id = SSG_GROUP_ID_NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_INVALID;
int sret;
/* set any default options (that may be overwritten by cmd args) */
......@@ -134,16 +133,15 @@ int main(int argc, char *argv[])
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
/* load GID from file */
ssg_group_id_load(opts.gid_file, &in_g_id);
ssg_group_id_load(opts.gid_file, &g_id);
/* sleep until time to join */
if (opts.join_time > 0)
margo_thread_sleep(mid, opts.join_time * 1000.0);
/* XXX do we want to use callback for testing anything about group??? */
out_g_id = ssg_group_join(in_g_id, NULL, NULL);
DIE_IF(out_g_id == SSG_GROUP_ID_NULL, "ssg_group_join");
ssg_group_id_free(in_g_id);
sret = ssg_group_join(g_id, NULL, NULL);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_join");
/* sleep for given duration to allow group time to run */
if (opts.leave_time >= 0)
......@@ -151,9 +149,9 @@ int main(int argc, char *argv[])
margo_thread_sleep(mid, (opts.leave_time - opts.join_time) * 1000.0);
/* dump group to see view prior to leaving */
ssg_group_dump(out_g_id);
ssg_group_dump(g_id);
sret = ssg_group_leave(out_g_id);
sret = ssg_group_leave(g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_leave");
margo_thread_sleep(mid, (opts.shutdown_time - opts.leave_time) * 1000.0);
......@@ -162,8 +160,8 @@ int main(int argc, char *argv[])
{
margo_thread_sleep(mid, (opts.shutdown_time - opts.join_time) * 1000.0);
ssg_group_dump(out_g_id);
ssg_group_destroy(out_g_id);
ssg_group_dump(g_id);
ssg_group_destroy(g_id);
}
ssg_finalize();
......
......@@ -37,11 +37,12 @@ static void usage()
{
fprintf(stderr,
"Usage: "
"ssg-test-observe [-s <time>] <addr> \n"
"ssg-test-observe [-s <time>] <addr> <GID>\n"
"Observe group given by GID using Mercury address ADDR.\n"
"\t-s <time> - time to sleep between SSG group operations\n");
}
static void parse_args(int argc, char *argv[], int *sleep_time, const char **addr_str)
static void parse_args(int argc, char *argv[], int *sleep_time, const char **addr_str, const char **gid_file)
{
int ndx = 1;
......@@ -50,7 +51,7 @@ static void parse_args(int argc, char *argv[], int *sleep_time, const char **add
exit(1);
#endif
if (argc < 2)
if (argc < 3)
{
usage();
exit(1);
......@@ -69,7 +70,8 @@ static void parse_args(int argc, char *argv[], int *sleep_time, const char **add
}
}
*addr_str = argv[ndx];
*addr_str = argv[ndx++];
*gid_file = argv[ndx++];
return;
}
......@@ -85,18 +87,11 @@ int main(int argc, char *argv[])
margo_instance_id mid = MARGO_INSTANCE_NULL;
int sleep_time = 0;
const char *addr_str;
const char *group_name = "simple_group";
const char *gid_file;
ssg_group_id_t g_id;
int group_id_forward_rpc_id;
int is_observer = 0;
hg_addr_t observer_addr;
char observer_addr_str[128];
hg_size_t observer_addr_str_sz = 128;
hg_handle_t handle = HG_HANDLE_NULL;
hg_return_t hret;
int sret;
parse_args(argc, argv, &sleep_time, &addr_str);
parse_args(argc, argv, &sleep_time, &addr_str, &gid_file);
#ifdef SSG_HAVE_MPI
MPI_Init(&argc, &argv);
......@@ -111,88 +106,12 @@ int main(int argc, char *argv[])
sret = ssg_init(mid);
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
/* register RPC for forwarding an SSG group identifier */
group_id_forward_rpc_id = MARGO_REGISTER(mid, "group_id_forward",
ssg_group_id_t, void, group_id_forward_recv_ult);
hret = margo_register_data(mid, group_id_forward_rpc_id, &g_id, NULL);
DIE_IF(hret != HG_SUCCESS, "margo_register_data");
sret = ssg_group_id_load(gid_file, &g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_id_load");
#ifdef SSG_HAVE_MPI
int my_world_rank;
int world_size;
int color;
MPI_Comm ssg_comm;
/* create a communicator for the SSG group */
/* NOTE: rank 0 will not be in the group and will instead observe
* as a client -- ranks 1:n-1 then represent the SSG group
*/
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
if (world_size < 2)
{
fprintf(stderr, "Error: MPI_COMM_WORLD must contain at least 2 processes\n");
exit(1);
}
MPI_Comm_rank(MPI_COMM_WORLD, &my_world_rank);
if (my_world_rank == 0)
{
is_observer = 1;
color = MPI_UNDEFINED;
}
else
{
color = 0;
}
MPI_Comm_split(MPI_COMM_WORLD, color, my_world_rank, &ssg_comm);
if (!is_observer)
{
g_id = ssg_group_create_mpi(group_name, ssg_comm, NULL, NULL);
DIE_IF(g_id == SSG_GROUP_ID_NULL, "ssg_group_create");
if (my_world_rank == 1)
{
MPI_Recv(observer_addr_str, 128, MPI_BYTE, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
/* send the identifier for the created group back to the observer */
hret = margo_addr_lookup(mid, observer_addr_str, &observer_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_lookup");
hret = margo_create(mid, observer_addr, group_id_forward_rpc_id, &handle);
DIE_IF(hret != HG_SUCCESS, "margo_create");
hret = margo_forward(handle, &g_id);
DIE_IF(hret != HG_SUCCESS, "margo_forward");
margo_addr_free(mid, observer_addr);
margo_destroy(handle);
}
}
else
{
hret = margo_addr_self(mid, &observer_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
hret = margo_addr_to_string(mid, observer_addr_str, &observer_addr_str_sz,
observer_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
margo_addr_free(mid, observer_addr);
/* send the oberver's address to a group member, so the group
* member can send us back the corresponding SSG group identifier
*/
MPI_Send(observer_addr_str, 128, MPI_BYTE, 1, 0, MPI_COMM_WORLD);
}
#endif
/* for now, just sleep to give all procs an opportunity to create the group */
/* XXX: we could replace this with a barrier eventually */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
if (is_observer)
{
/* start observging the SSG server group */
sret = ssg_group_observe(g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_observe");
}
/* start observging the SSG server group */
sret = ssg_group_observe(g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_observe");
/* for now, just sleep to give observer a chance to establish connection */
/* XXX: we could replace this with a barrier eventually */
......@@ -201,15 +120,10 @@ int main(int argc, char *argv[])
/* have everyone dump their group state */
ssg_group_dump(g_id);
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/* clean up */
if (is_observer)
{
ssg_group_unobserve(g_id);
}
else
{
ssg_group_destroy(g_id);
}
ssg_group_unobserve(g_id);
ssg_finalize();
margo_finalize(mid);
......@@ -219,31 +133,3 @@ int main(int argc, char *argv[])
return 0;
}
static void group_id_forward_recv_ult(hg_handle_t handle)
{
const struct hg_info *info;
margo_instance_id mid;
ssg_group_id_t *g_id_p;
ssg_group_id_t tmp_g_id;
hg_return_t hret;
info = margo_get_info(handle);
DIE_IF(info == NULL, "margo_get_info");
mid = margo_hg_info_get_instance(info);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_hg_info_get_instance");
g_id_p = (ssg_group_id_t *)margo_registered_data(mid, info->id);
DIE_IF(g_id_p == NULL, "margo_registered_data");
hret = margo_get_input(handle, &tmp_g_id);
DIE_IF(hret != HG_SUCCESS, "margo_get_input");
*g_id_p = ssg_group_id_dup(tmp_g_id);
margo_respond(handle, NULL);
margo_free_input(handle, &tmp_g_id);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(group_id_forward_recv_ult)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment