Commit b636e253 authored by Philip Carns's avatar Philip Carns

add margo_wait_for_finalize() function

- Demonstrated in test program.  This function allows caller to suspend
  until some other entity calls margo_finalize(). If called by a ULT
  that is executing in the same pool as the HG progress thread, then it
  will indicate to the progress thread that it does not need to yield
  execution.
- fixes #6
parent 2bc6ec83
......@@ -18,8 +18,6 @@
* close.
*/
extern ABT_eventual* shutdown_eventual;
static void my_rpc_ult(void *_arg)
{
hg_handle_t *handle = _arg;
......@@ -106,10 +104,12 @@ static void my_rpc_shutdown_ult(void *_arg)
HG_Destroy(*handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
ABT_eventual_set(*shutdown_eventual, NULL, 0);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
......@@ -13,8 +13,6 @@
#include "my-rpc.h"
ABT_eventual* shutdown_eventual;
/* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely.
*/
......@@ -22,7 +20,6 @@ ABT_eventual* shutdown_eventual;
int main(int argc, char **argv)
{
int ret;
ABT_eventual eventual;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
......@@ -34,8 +31,6 @@ int main(int argc, char **argv)
hg_class_t *hg_class;
int single_pool_mode = 0;
shutdown_eventual = &eventual;
if(argc > 2)
{
fprintf(stderr, "Usage: ./server <single>\n");
......@@ -148,16 +143,21 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* suspend this ULT until someone tells us to shut down */
ret = ABT_eventual_create(0, &eventual);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_eventual_create()\n");
return(-1);
}
/* wait for shutdown (assume that margo will be finalized by an RPC) */
ABT_eventual_wait(eventual, NULL);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
* then call margo_finalize(). Otherwise, it can call
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
*/
margo_wait_for_finalize(mid);
if(!single_pool_mode)
{
......
......@@ -40,6 +40,18 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
*/
void margo_finalize(margo_instance_id mid);
/**
* Suspends the caller until some other entity (e.g. an RPC, thread, or
* signal handler) invokes margo_finalize().
*
* NOTE: This informs Margo that the calling thread no longer needs to be
* scheduled for execution if it is sharing an Argobots pool with the
* progress engine.
*
* @param [in] mid Margo instance
*/
void margo_wait_for_finalize(margo_instance_id mid);
/**
* Retrieve the abt_handler pool that was associated with the instance at
* initialization time
......
......@@ -24,6 +24,13 @@ struct margo_instance
/* internal to margo for this particular instance */
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int finalize_waiters_in_progress_pool;
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
int table_index;
};
......@@ -61,6 +68,9 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
return(MARGO_INSTANCE_NULL);
memset(mid, 0, sizeof(*mid));
ABT_mutex_create(&mid->finalize_mutex);
ABT_cond_create(&mid->finalize_cond);
mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool;
mid->hg_class = hg_class;
......@@ -100,6 +110,56 @@ void margo_finalize(margo_instance_id mid)
}
handler_mapping_table_size--;
ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_flag = 1;
ABT_cond_broadcast(mid->finalize_cond);
ABT_mutex_unlock(mid->finalize_mutex);
/* TODO: yuck, there is a race here if someone was really waiting for
* finalize; we can't destroy the data structures out from under them.
* We could fix this by reference counting so that the last caller
* (whether a finalize() caller or wait_for_finalize() caller) knows it
* is safe to turn off the lights on their way out. For now we just leak
* a small amount of memory.
*/
#if 0
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
free(mid);
#endif
return;
}
void margo_wait_for_finalize(margo_instance_id mid)
{
ABT_xstream xstream;
ABT_pool pool;
int ret;
int in_pool = 0;
ret = ABT_xstream_self(&xstream);
if(ret != 0)
return;
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
return;
/* Is this waiter in the same pool as the pool running the progress
* thread?
*/
if(pool == mid->progress_pool)
in_pool = 1;
ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_waiters_in_progress_pool += in_pool;
while(!mid->finalize_flag)
ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);
ABT_mutex_unlock(mid->finalize_mutex);
return;
}
......@@ -120,7 +180,12 @@ static void hg_progress_fn(void* foo)
if(!mid->hg_progress_shutdown_flag)
{
ABT_pool_get_total_size(mid->progress_pool, &size);
if(size > 0)
/* Are there any other threads executing in this pool that are *not*
* blocked on margo_wait_for_finalize()? If so then, we can't
* sleep here or else those threads will not get a chance to
* execute.
*/
if(size > mid->finalize_waiters_in_progress_pool)
{
HG_Progress(mid->hg_context, 0);
ABT_thread_yield();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment