Commit 6c87264f authored by Shane Snyder's avatar Shane Snyder

properly acquire/release mid states

parent cbae914a
......@@ -67,6 +67,7 @@ typedef struct ssg_mid_state
hg_id_t swim_dping_ack_rpc_id;
hg_id_t swim_iping_req_rpc_id;
hg_id_t swim_iping_ack_rpc_id;
int ref_count;
struct ssg_mid_state *next;
} ssg_mid_state_t;
......
......@@ -38,8 +38,10 @@
#include "swim-fd/swim-fd.h"
/* SSG helper routine prototypes */
ssg_mid_state_t *ssg_retrieve_mid_state(
ssg_mid_state_t *ssg_acquire_mid_state(
margo_instance_id mid);
static void ssg_release_mid_state(
ssg_mid_state_t *mid_state);
static ssg_group_id_t ssg_group_create_internal(
ssg_mid_state_t *mid_state, const char * group_name,
const char * const group_addr_strs[], int group_size,
......@@ -203,11 +205,13 @@ ssg_group_id_t ssg_group_create(
if (!ssg_rt) return g_id;
mid_state = ssg_retrieve_mid_state(mid);
if(!mid_state) return g_id;
mid_state = ssg_acquire_mid_state(mid);
if (!mid_state) return g_id;
g_id = ssg_group_create_internal(mid_state, group_name, group_addr_strs,
group_size, group_conf, update_cb, update_cb_dat);
if (g_id == SSG_GROUP_ID_INVALID)
ssg_release_mid_state(mid_state);
return g_id;
}
......@@ -234,8 +238,8 @@ ssg_group_id_t ssg_group_create_config(
if (!ssg_rt) goto fini;
mid_state = ssg_retrieve_mid_state(mid);
if(!mid_state) goto fini;
mid_state = ssg_acquire_mid_state(mid);
if (!mid_state) goto fini;
/* open config file for reading */
fd = open(file_name, O_RDONLY);
......@@ -306,6 +310,8 @@ fini:
free(rd_buf);
free(addr_str_buf);
free(addr_strs);
if (g_id == SSG_GROUP_ID_INVALID)
ssg_release_mid_state(mid_state);
return g_id;
}
......@@ -331,7 +337,7 @@ ssg_group_id_t ssg_group_create_mpi(
if (!ssg_rt) goto fini;
mid_state = ssg_retrieve_mid_state(mid);
mid_state = ssg_acquire_mid_state(mid);
if(!mid_state) goto fini;
/* gather the buffer sizes */
......@@ -372,6 +378,8 @@ fini:
free(sizes_psum);
free(addr_str_buf);
free(addr_strs);
if (g_id == SSG_GROUP_ID_INVALID)
ssg_release_mid_state(mid_state);
return g_id;
}
......@@ -406,7 +414,7 @@ ssg_group_id_t ssg_group_create_pmix(
if (!ssg_rt || !PMIx_Initialized()) goto fini;
mid_state = ssg_retrieve_mid_state(mid);
mid_state = ssg_acquire_mid_state(mid);
if(!mid_state) goto fini;
/* we need to store a mapping of PMIx ranks to SSG member IDs so that
......@@ -533,6 +541,8 @@ fini:
/* cleanup before returning */
free(addr_strs);
PMIX_VALUE_FREE(addr_vals, nprocs);
if (g_id == SSG_GROUP_ID_INVALID)
ssg_release_mid_state(mid_state);
return g_id;
}
......@@ -613,7 +623,7 @@ int ssg_group_join(
ABT_rwlock_unlock(ssg_rt->lock);
mid_state = ssg_retrieve_mid_state(mid);
mid_state = ssg_acquire_mid_state(mid);
if(!mid_state) goto fini;
sret = ssg_group_join_send(group_id, g_desc->addr_str, mid_state,
......@@ -705,7 +715,7 @@ int ssg_group_observe(
margo_instance_id mid,
ssg_group_id_t group_id)
{
ssg_mid_state_t *mid_state;
ssg_mid_state_t *mid_state = NULL;
ssg_group_descriptor_t *g_desc;
ssg_observed_group_t *og = NULL;
char *group_name = NULL;
......@@ -745,7 +755,7 @@ int ssg_group_observe(
ABT_rwlock_unlock(ssg_rt->lock);
mid_state = ssg_retrieve_mid_state(mid);
mid_state = ssg_acquire_mid_state(mid);
if(!mid_state) goto fini;
/* send the observe request to a group member to initiate a bulk transfer
......@@ -785,10 +795,18 @@ int ssg_group_observe(
group_name = NULL;
og = NULL;
fini:
if (og) ssg_observed_group_destroy(og);
if (og)
{
ssg_group_view_destroy(&og->view, og->mid_state->mid);
ABT_rwlock_free(&og->lock);
free(og->name);
free(og);
}
free(addr_strs);
free(view_buf);
free(group_name);
if ((sret == SSG_FAILURE) && mid_state)
ssg_release_mid_state(mid_state);
return sret;
}
......@@ -1540,7 +1558,7 @@ void ssg_group_dump(
*** SSG internal helper routines ***
************************************/
ssg_mid_state_t *ssg_retrieve_mid_state(
ssg_mid_state_t *ssg_acquire_mid_state(
margo_instance_id mid)
{
ssg_mid_state_t *mid_state;
......@@ -1557,6 +1575,7 @@ ssg_mid_state_t *ssg_retrieve_mid_state(
ABT_rwlock_unlock(ssg_rt->lock);
return NULL;
}
memset(mid_state, 0, sizeof(*mid_state));
mid_state->mid = mid;
/* get my self address string and ID (which are constant per-mid) */
......@@ -1605,11 +1624,32 @@ ssg_mid_state_t *ssg_retrieve_mid_state(
/* add to mid list */
LL_APPEND(ssg_rt->mid_list, mid_state);
}
mid_state->ref_count++;
ABT_rwlock_unlock(ssg_rt->lock);
return mid_state;
}
static void ssg_release_mid_state(
ssg_mid_state_t *mid_state)
{
ABT_rwlock_wrlock(ssg_rt->lock);
mid_state->ref_count--;
if (!mid_state->ref_count)
{
LL_DELETE(ssg_rt->mid_list, mid_state);
ssg_deregister_rpcs(mid_state);
margo_addr_free(mid_state->mid, mid_state->self_addr);
free(mid_state->self_addr_str);
free(mid_state);
}
ABT_rwlock_unlock(ssg_rt->lock);
return;
}
static ssg_group_id_t ssg_group_create_internal(
ssg_mid_state_t *mid_state, const char * group_name,
const char * const group_addr_strs[], int group_size,
......@@ -1948,6 +1988,8 @@ static void ssg_group_destroy_internal(
free(state);
}
ssg_release_mid_state(g->mid_state);
#ifdef DEBUG
char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
......@@ -1971,6 +2013,7 @@ static void ssg_observed_group_destroy(
ssg_group_view_destroy(&og->view, og->mid_state->mid);
ABT_rwlock_unlock(og->lock);
ssg_release_mid_state(og->mid_state);
ABT_rwlock_free(&og->lock);
free(og->name);
free(og);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment