Commit c121d4ad authored by Shane Snyder's avatar Shane Snyder

add ability to load/store multiple group members

parent 80f95513
......@@ -35,6 +35,8 @@ typedef uint64_t ssg_group_id_t;
typedef uint64_t ssg_member_id_t;
#define SSG_MEMBER_ID_INVALID 0
#define SSG_ALL_MEMBERS (-1)
typedef struct ssg_group_config
{
int32_t swim_period_length_ms; /* period length in miliseconds */
......@@ -167,9 +169,27 @@ int ssg_group_destroy(
* NOTE: Use the returned group ID to refer to the group, as the input group ID
* becomes stale after the join is completed.
*/
int ssg_group_join(
#define ssg_group_join(mid, group_id, update_cb, update_cb_dat) \
ssg_group_join_target(mid, group_id, NULL, update_cb, update_cb_dat)
/**
* Adds the calling process to an SSG group, specifying the address string
* of the target group member to send the request to.
*
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_id Input SSG group ID
* @param[in] target_addr_str Address string of group member target
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: Use the returned group ID to refer to the group, as the input group ID
* becomes stale after the join is completed.
*/
int ssg_group_join_target(
margo_instance_id mid,
ssg_group_id_t group_id,
const char * target_addr_str,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
......@@ -179,8 +199,20 @@ int ssg_group_join(
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_leave(
ssg_group_id_t group_id);
#define ssg_group_leave(group_id) \
ssg_group_leave_target(group_id, NULL)
/**
* Removes the calling process from an SSG group, specifying the address string
* of the target group member to send the request to.
*
* @param[in] group_id SSG group ID
* @param[in] target_addr_str Address string of group member target
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_leave_target(
ssg_group_id_t group_id,
const char * target_addr_str);
/**
* Initiates a client's observation of an SSG group.
......@@ -193,9 +225,26 @@ int ssg_group_leave(
* a way of making the membership view of an existing SSG group available to
* non-group members.
*/
int ssg_group_observe(
#define ssg_group_observe(mid, group_id) \
ssg_group_observe_target(mid, group_id, NULL)
/**
* Initiates a client's observation of an SSG group, specifying the address string
* of the target group member to send the request to.
*
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_id SSG group ID
* @param[in] target_addr_str Address string of group member target
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: The "client" cannot be a member of the group -- observation is merely
* a way of making the membership view of an existing SSG group available to
* non-group members.
*/
int ssg_group_observe_target(
margo_instance_id mid,
ssg_group_id_t group_id);
ssg_group_id_t group_id,
const char * target_addr_str);
/**
* Terminates a client's observation of an SSG group.
......@@ -291,13 +340,15 @@ int ssg_get_group_member_ids_from_range(
/**
* Retrieves the HG address string associated with an SSG group identifier.
*
* @param[in] group_id SSG group ID
* @param[in] group_id SSG group ID
* @param[in] addr_index Index (0-based) in GID's address list array
* @returns address string on success, NULL otherwise
*
* NOTE: returned string must be freed by caller.
*/
char *ssg_group_id_get_addr_str(
ssg_group_id_t group_id);
ssg_group_id_t group_id,
unsigned int addr_index);
/**
* Retrieves the credential associated with an SSG group identifier.
......@@ -312,24 +363,28 @@ int64_t ssg_group_id_get_cred(
* Serializes an SSG group identifier into a buffer.
*
* @param[in] group_id SSG group ID
* @param[in] num_addrs Number of group addressses to serialize (SSG_ALL_MEMBERS for all)
* @param[out] buf_p Pointer to store allocated buffer in
* @param[out] buf_size_p Pointer to store buffer size in
*/
void ssg_group_id_serialize(
ssg_group_id_t group_id,
int num_addrs,
char ** buf_p,
size_t * buf_size_p);
/**
* Deserializes an SSG group identifier from a buffer.
*
* @param[in] buf Buffer containing the SSG group identifier
* @param[in] buf_size Size of given buffer
* @param[out] group_id_p Pointer to store group identifier in
* @param[in] buf Buffer containing the SSG group identifier
* @param[in] buf_size Size of given buffer
* @param[in/out] num_addrs Number of group addresses deserialized (input serves as max)
* @param[out] group_id_p Pointer to store group identifier in
*/
void ssg_group_id_deserialize(
const char * buf,
size_t buf_size,
int * num_addrs,
ssg_group_id_t * group_id_p);
/**
......@@ -337,21 +392,25 @@ void ssg_group_id_deserialize(
*
* @param[in] file_name File to store the group ID in
* @param[in] group_id SSG group ID
* @param[in] num_addrs Number of group addressses to serialize (SSG_ALL_MEMBERS for all)
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_id_store(
const char * file_name,
ssg_group_id_t group_id);
ssg_group_id_t group_id,
int num_addrs);
/**
* Loads an SSG group identifier from the given file name.
*
* @param[in] file_name File to store the group ID in
* @param[out] group_id_p Pointer to store group identifier in
* @param[in] file_name File to store the group ID in
* @param[in/out] num_addrs Number of group addresses deserialized (input serves as max)
* @param[out] group_id_p Pointer to store group identifier in
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_id_load(
const char * file_name,
int * num_addrs,
ssg_group_id_t * group_id_p);
/** Dumps details of caller's membership in a given group to stdout.
......
......@@ -74,8 +74,8 @@ typedef struct ssg_mid_state
typedef struct ssg_group_descriptor
{
ssg_group_id_t g_id;
uint64_t magic_nr;
char *addr_str;
size_t num_addr_strs;
char **addr_strs;
int64_t cred;
int owner_status;
union
......@@ -162,7 +162,7 @@ void ssg_deregister_rpcs(
ssg_mid_state_t *mid_state);
int ssg_group_join_send(
ssg_group_id_t g_id,
const char * target_addr_str,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state,
char ** group_name,
int * group_size,
......@@ -170,11 +170,11 @@ int ssg_group_join_send(
void ** view_buf);
int ssg_group_leave_send(
ssg_group_id_t g_id,
hg_addr_t target_addr,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state);
int ssg_group_observe_send(
ssg_group_id_t g_id,
const char * target_addr_str,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state,
char ** group_name,
int * group_size,
......
......@@ -113,14 +113,13 @@ void ssg_deregister_rpcs(
*/
int ssg_group_join_send(
ssg_group_id_t g_id,
const char * target_addr_str,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state,
char ** group_name,
int * group_size,
ssg_group_config_t * group_config,
void ** view_buf)
{
hg_addr_t group_target_addr;
hg_handle_t handle = HG_HANDLE_NULL;
hg_bulk_t bulk_handle = HG_BULK_NULL;
void *tmp_view_buf = NULL, *b;
......@@ -135,10 +134,6 @@ int ssg_group_join_send(
*view_buf = NULL;
/* send join request to given group member */
hret = margo_addr_lookup(mid_state->mid, target_addr_str,
&group_target_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_create(mid_state->mid, group_target_addr,
mid_state->join_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
......@@ -214,8 +209,6 @@ int ssg_group_join_send(
if (sret == SSG_SUCCESS)
tmp_view_buf = NULL; /* don't free on success */
fini:
if (group_target_addr != HG_ADDR_NULL)
margo_addr_free(mid_state->mid, group_target_addr);
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
free(tmp_view_buf);
......@@ -236,6 +229,7 @@ static void ssg_group_join_recv_ult(
hg_size_t view_buf_size;
hg_bulk_t bulk_handle = HG_BULK_NULL;
ssg_member_update_t join_update;
int group_size;
int sret;
hg_return_t hret;
......@@ -274,6 +268,7 @@ static void ssg_group_join_recv_ult(
margo_free_input(handle, &join_req);
goto fini;
}
group_size = g_desc->g_data.g->view.size;
if (view_size_requested >= view_buf_size)
{
......@@ -308,7 +303,7 @@ static void ssg_group_join_recv_ult(
/* set the response and send back */
join_resp.group_name = g_desc->g_data.g->name;
join_resp.group_size = (int)g_desc->g_data.g->view.size;
join_resp.group_size = group_size;
memcpy(&join_resp.group_config, &g_desc->g_data.g->config,
sizeof(join_resp.group_config));
join_resp.view_buf_size = view_buf_size;
......@@ -332,17 +327,18 @@ DEFINE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult)
*/
int ssg_group_leave_send(
ssg_group_id_t g_id,
hg_addr_t target_addr,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state)
{
hg_handle_t handle = HG_HANDLE_NULL;
ssg_group_leave_request_t leave_req;
ssg_group_leave_response_t leave_resp;
hg_return_t hret;
int sret = SSG_FAILURE;
/* send leave request to given group member */
hret = margo_create(mid_state->mid, target_addr,
hret = margo_create(mid_state->mid, group_target_addr,
mid_state->leave_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
......@@ -427,13 +423,12 @@ DEFINE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult)
*/
int ssg_group_observe_send(
ssg_group_id_t g_id,
const char * target_addr_str,
hg_addr_t group_target_addr,
ssg_mid_state_t * mid_state,
char ** group_name,
int * group_size,
void ** view_buf)
{
hg_addr_t group_target_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
hg_bulk_t bulk_handle = HG_BULK_NULL;
void *tmp_view_buf = NULL, *b;
......@@ -448,10 +443,6 @@ int ssg_group_observe_send(
*view_buf = NULL;
/* lookup the address of the group member associated with the descriptor */
hret = margo_addr_lookup(mid_state->mid, target_addr_str,
&group_target_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_create(mid_state->mid, group_target_addr,
mid_state->observe_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
......@@ -526,8 +517,6 @@ int ssg_group_observe_send(
if (sret == SSG_SUCCESS)
tmp_view_buf = NULL; /* don't free on success */
fini:
if (group_target_addr != HG_ADDR_NULL)
margo_addr_free(mid_state->mid, group_target_addr);
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
free(tmp_view_buf);
......@@ -547,6 +536,7 @@ static void ssg_group_observe_recv_ult(
void *view_buf = NULL;
hg_size_t view_buf_size;
hg_bulk_t bulk_handle = HG_BULK_NULL;
int group_size;
int sret;
hg_return_t hret;
......@@ -585,6 +575,7 @@ static void ssg_group_observe_recv_ult(
margo_free_input(handle, &observe_req);
goto fini;
}
group_size = g_desc->g_data.g->view.size;
ABT_rwlock_unlock(ssg_rt->lock);
......@@ -611,7 +602,7 @@ static void ssg_group_observe_recv_ult(
/* set the response and send back */
observe_resp.group_name = g_desc->g_data.g->name;
observe_resp.group_size = (int)g_desc->g_data.g->view.size;
observe_resp.group_size = group_size;
observe_resp.view_buf_size = view_buf_size;
observe_resp.ret = SSG_SUCCESS;
fini:
......
This diff is collapsed.
......@@ -120,6 +120,7 @@ int main(int argc, char *argv[])
struct group_join_leave_opts opts;
margo_instance_id mid = MARGO_INSTANCE_NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_INVALID;
int num_addrs;
int sret;
/* set any default options (that may be overwritten by cmd args) */
......@@ -144,8 +145,10 @@ int main(int argc, char *argv[])
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
/* load GID from file */
sret = ssg_group_id_load(opts.gid_file, &g_id);
num_addrs = SSG_ALL_MEMBERS;
sret = ssg_group_id_load(opts.gid_file, &num_addrs, &g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_id_load");
DIE_IF(num_addrs < 1, "ssg_group_id_load");
/* sleep until time to join */
if (opts.join_time > 0)
......
......@@ -205,9 +205,9 @@ int main(int argc, char *argv[])
if (opts.gid_file)
{
snprintf(scratch, 1024, "%s-%d", opts.gid_file, 1);
ssg_group_id_store(scratch, g1_id);
ssg_group_id_store(scratch, g1_id, 1);
snprintf(scratch, 1024, "%s-%d", opts.gid_file, 2);
ssg_group_id_store(scratch, g2_id);
ssg_group_id_store(scratch, g2_id, 1);
}
/* sleep for given duration to allow group time to run */
......
......@@ -216,9 +216,9 @@ int main(int argc, char *argv[])
if (opts.gid_file)
{
snprintf(scratch, 1024, "%s-%d", opts.gid_file, 1);
ssg_group_id_store(scratch, g1_id);
ssg_group_id_store(scratch, g1_id, 1);
snprintf(scratch, 1024, "%s-%d", opts.gid_file, 2);
ssg_group_id_store(scratch, g2_id);
ssg_group_id_store(scratch, g2_id, 1);
}
/* sleep for given duration to allow group time to run */
......
......@@ -271,7 +271,7 @@ int main(int argc, char *argv[])
/* store the gid if requested */
if (opts.gid_file)
ssg_group_id_store(opts.gid_file, g_id);
ssg_group_id_store(opts.gid_file, g_id, 1);
/* sleep for given duration to allow group time to run */
if (opts.shutdown_time > 0)
......
......@@ -195,7 +195,7 @@ int main(int argc, char *argv[])
/* store the gid if requested */
if (opts.gid_file)
ssg_group_id_store(opts.gid_file, g_id);
ssg_group_id_store(opts.gid_file, g_id, SSG_ALL_MEMBERS);
/* sleep for given duration to allow group time to run */
if (opts.shutdown_time > 0)
......
......@@ -85,6 +85,7 @@ int main(int argc, char *argv[])
const char *addr_str;
const char *gid_file;
ssg_group_id_t g_id;
int num_addrs;
int64_t ssg_cred;
uint32_t drc_credential_id;
drc_info_handle_t drc_credential_info;
......@@ -112,8 +113,10 @@ int main(int argc, char *argv[])
ret = ssg_init();
DIE_IF(ret != SSG_SUCCESS, "ssg_init");
ret = ssg_group_id_load(gid_file, &g_id);
num_addrs = 1;
ret = ssg_group_id_load(gid_file, &num_addrs, &g_id);
DIE_IF(ret != SSG_SUCCESS, "ssg_group_id_load");
DIE_IF(num_addrs != 1, "ssg_group_id_load");
ssg_cred = ssg_group_id_get_cred(g_id);
DIE_IF(ssg_cred == -1, "ssg_group_id_get_cred");
......
......@@ -77,6 +77,7 @@ int main(int argc, char *argv[])
const char *addr_str;
const char *gid_file;
ssg_group_id_t g_id;
int num_addrs;
int sret;
parse_args(argc, argv, &sleep_time, &addr_str, &gid_file);
......@@ -94,8 +95,10 @@ int main(int argc, char *argv[])
sret = ssg_init();
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
sret = ssg_group_id_load(gid_file, &g_id);
num_addrs = SSG_ALL_MEMBERS;
sret = ssg_group_id_load(gid_file, &num_addrs, &g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_id_load");
DIE_IF(num_addrs < 1, "ssg_group_id_load");
/* start observging the SSG server group */
sret = ssg_group_observe(mid, g_id);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment