Commit ca395517 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'fix-finalize' into 'master'

margo_finalize now doesn't clean up until pending operations are completed

See merge request sds/margo!5
parents afc7cf00 73cc8712
...@@ -804,6 +804,35 @@ void margo_set_param(margo_instance_id mid, int option, const void *param); ...@@ -804,6 +804,35 @@ void margo_set_param(margo_instance_id mid, int option, const void *param);
void margo_get_param(margo_instance_id mid, int option, void *param); void margo_get_param(margo_instance_id mid, int option, void *param);
/**
* @private
* Internal function used by MARGO_REGISTER, not
* supposed to be called by users!
*
* @param mid Margo instance
*
* @return whether margo_finalize() was called.
*/
int __margo_internal_finalize_requested(margo_instance_id mid);
/**
* @private
* Internal function used by MARGO_REGISTER, not
* supposed to be called by users!
*
* @param mid Margo instance
*/
void __margo_internal_incr_pending(margo_instance_id mid);
/**
* @private
* Internal function used by MARGO_REGISTER, not
* supposed to be called by users!
*
* @param mid Margo instance
*/
void __margo_internal_decr_pending(margo_instance_id mid);
/** /**
* macro that registers a function as an RPC. * macro that registers a function as an RPC.
*/ */
...@@ -828,14 +857,25 @@ void margo_get_param(margo_instance_id mid, int option, void *param); ...@@ -828,14 +857,25 @@ void margo_get_param(margo_instance_id mid, int option, void *param);
* @param [in] __name name of handler function * @param [in] __name name of handler function
*/ */
#define DEFINE_MARGO_RPC_HANDLER(__name) \ #define DEFINE_MARGO_RPC_HANDLER(__name) \
void __name##_wrapper(hg_handle_t handle) { \
__name(handle); \
margo_instance_id __mid; \
__mid = margo_hg_handle_get_instance(handle); \
__margo_internal_decr_pending(__mid); \
if(__margo_internal_finalize_requested(__mid)) { \
margo_finalize(__mid); \
} \
} \
hg_return_t __name##_handler(hg_handle_t handle) { \ hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \ int __ret; \
ABT_pool __pool; \ ABT_pool __pool; \
margo_instance_id __mid; \ margo_instance_id __mid; \
__mid = margo_hg_handle_get_instance(handle); \ __mid = margo_hg_handle_get_instance(handle); \
if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \ if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
if(__margo_internal_finalize_requested(__mid)) { return(HG_CANCELED); } \
__pool = margo_hg_handle_get_handler_pool(handle); \ __pool = margo_hg_handle_get_handler_pool(handle); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \ __margo_internal_incr_pending(__mid); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name##_wrapper, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \ if(__ret != 0) { \
return(HG_NOMEM_ERROR); \ return(HG_NOMEM_ERROR); \
} \ } \
......
...@@ -93,6 +93,12 @@ struct margo_instance ...@@ -93,6 +93,12 @@ struct margo_instance
ABT_cond finalize_cond; ABT_cond finalize_cond;
struct margo_finalize_cb* finalize_cb; struct margo_finalize_cb* finalize_cb;
/* control logic to prevent margo_finalize from destroying
the instance when some operations are pending */
unsigned pending_operations;
ABT_mutex pending_operations_mtx;
int finalize_requested;
/* control logic for shutting down */ /* control logic for shutting down */
hg_id_t shutdown_rpc_id; hg_id_t shutdown_rpc_id;
int enable_remote_shutdown; int enable_remote_shutdown;
...@@ -348,6 +354,10 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -348,6 +354,10 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->finalize_cb = NULL; mid->finalize_cb = NULL;
mid->enable_remote_shutdown = 0; mid->enable_remote_shutdown = 0;
mid->pending_operations = 0;
ABT_mutex_create(&mid->pending_operations_mtx);
mid->finalize_requested = 0;
mid->timer_list = margo_timer_list_create(); mid->timer_list = margo_timer_list_create();
if(mid->timer_list == NULL) goto err; if(mid->timer_list == NULL) goto err;
...@@ -371,6 +381,7 @@ err: ...@@ -371,6 +381,7 @@ err:
margo_timer_list_free(mid->timer_list); margo_timer_list_free(mid->timer_list);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
free(mid); free(mid);
} }
return MARGO_INSTANCE_NULL; return MARGO_INSTANCE_NULL;
...@@ -396,6 +407,7 @@ static void margo_cleanup(margo_instance_id mid) ...@@ -396,6 +407,7 @@ static void margo_cleanup(margo_instance_id mid)
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->pending_operations_mtx);
if (mid->owns_progress_pool) if (mid->owns_progress_pool)
{ {
...@@ -432,6 +444,16 @@ void margo_finalize(margo_instance_id mid) ...@@ -432,6 +444,16 @@ void margo_finalize(margo_instance_id mid)
{ {
int do_cleanup; int do_cleanup;
/* check if there are pending operations */
int pending;
ABT_mutex_lock(mid->pending_operations_mtx);
pending = mid->pending_operations;
ABT_mutex_unlock(mid->pending_operations_mtx);
if(pending) {
mid->finalize_requested = 1;
return;
}
/* tell progress thread to wrap things up */ /* tell progress thread to wrap things up */
mid->hg_progress_shutdown_flag = 1; mid->hg_progress_shutdown_flag = 1;
...@@ -1628,3 +1650,24 @@ static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id, ...@@ -1628,3 +1650,24 @@ static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id,
return(id); return(id);
} }
int __margo_internal_finalize_requested(margo_instance_id mid)
{
if(!mid) return 0;
return mid->finalize_requested;
}
void __margo_internal_incr_pending(margo_instance_id mid)
{
if(!mid) return;
ABT_mutex_lock(mid->pending_operations_mtx);
mid->pending_operations += 1;
ABT_mutex_unlock(mid->pending_operations_mtx);
}
void __margo_internal_decr_pending(margo_instance_id mid)
{
if(!mid) return;
ABT_mutex_lock(mid->pending_operations_mtx);
mid->pending_operations -= 1;
ABT_mutex_unlock(mid->pending_operations_mtx);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment