Commit fd88d80a authored by Shane Snyder's avatar Shane Snyder

add margo_thread_sleep() functionality

I factored the sorted queue implementation Phil created for
monitoring timeouts out into separate files (margo-timer.c/h).
There is now an interface for submitting timers that define
a callback function that's executed when the timers expire.
margo_thread_sleep() and margo_forward_timed() will both use
this interface, though only margo_thread_sleep() is implemented
at the moment.
parent 17f8c22d
......@@ -15,7 +15,7 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/margo.h
include_HEADERS = include/margo.h include/margo-timer.h
EXTRA_DIST += \
prepare.sh
......
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
static int use_abt_sleep;
static void sleep_fn(void *arg);
int main(int argc, char **argv)
{
ABT_thread threads[4];
int t_ids[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
if(argc == 1)
{
use_abt_sleep = 0;
}
else if((argc == 2) && (strcmp(argv[1], "ABT") == 0))
{
use_abt_sleep = 1;
}
else
{
fprintf(stderr, "Usage: %s [ABT]\n", argv[0]);
fprintf(stderr, "\tABT: use ABT sleep mechanism, rather than POSIX sleep.\n");
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init("tcp://localhost:1234", HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* retrieve current pool to use for ULT creation */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication. The rpc handler pool is null in this example program
* because this is a pure client that will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init(pool, ABT_POOL_NULL, hg_context, hg_class);
for(i=0; i<4; i++)
{
t_ids[i] = i;
/* start up the sleeper threads */
ret = ABT_thread_create(pool, sleep_fn, &t_ids[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
}
}
/* yield to one of the threads */
ABT_thread_yield_to(threads[0]);
for(i=0; i<4; i++)
{
ret = ABT_thread_join(threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
ret = ABT_thread_free(&threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
}
/* shut down everything */
margo_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0);
}
static void sleep_fn(void *arg)
{
int my_tid = *(int *)arg;
if(use_abt_sleep)
margo_thread_sleep(2*1000.0);
else
sleep(2);
printf("TID: %d sleep end\n", my_tid);
return;
}
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_TIMER
#define __MARGO_TIMER
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*margo_timer_cb_fn)(void *);
typedef struct margo_timed_element_s
{
margo_timer_cb_fn cb_fn;
void *cb_dat;
double expiration;
struct margo_timed_element_s *next;
struct margo_timed_element_s *prev;
} margo_timed_element;
void margo_timer_init(
void);
void margo_timer_cleanup(
void);
void margo_thread_sleep(
double timeout_ms);
void margo_create_timer(
margo_timer_cb_fn cb_fn,
void *cb_dat,
double timeout_ms);
void margo_check_timers(
void);
#ifdef __cplusplus
}
#endif
#endif /* __MARGO_TIMER */
......@@ -17,6 +17,7 @@ extern "C" {
#include <abt.h>
#include <ev.h>
#include "margo-timer.h"
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
......
src_libmargo_a_SOURCES += \
src/margo.c
src/margo.c \
src/margo-timer.c
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
#include <abt.h>
#include "margo-timer.h"
#include "utlist.h"
static void margo_queue_timer(margo_timed_element *el);
static ABT_mutex timer_mutex = ABT_MUTEX_NULL;
static margo_timed_element *timer_head = NULL;
void margo_timer_init()
{
ABT_mutex_create(&timer_mutex);
return;
}
void margo_timer_cleanup()
{
margo_timed_element *cur;
if(timer_mutex == ABT_MUTEX_NULL)
return;
ABT_mutex_lock(timer_mutex);
/* free any remaining timers */
while(timer_head)
{
cur = timer_head;
DL_DELETE(timer_head, cur);
free(cur);
}
ABT_mutex_unlock(timer_mutex);
ABT_mutex_free(&timer_mutex);
timer_mutex = ABT_MUTEX_NULL;
return;
}
typedef struct
{
ABT_mutex sleep_mutex;
ABT_cond sleep_cond;
} margo_thread_sleep_cb_dat;
static void margo_thread_sleep_cb(void *arg)
{
margo_thread_sleep_cb_dat *cb_dat =
(margo_thread_sleep_cb_dat *)arg;
/* wake up the sleeping thread */
ABT_mutex_lock(cb_dat->sleep_mutex);
ABT_cond_signal(cb_dat->sleep_cond);
ABT_mutex_unlock(cb_dat->sleep_mutex);
return;
}
void margo_thread_sleep(double timeout_ms)
{
margo_thread_sleep_cb_dat *cb_dat;
/* set data needed for callback */
cb_dat = malloc(sizeof(margo_thread_sleep_cb_dat));
assert(cb_dat);
memset(cb_dat, 0 , sizeof(*cb_dat));
ABT_mutex_create(&(cb_dat->sleep_mutex));
ABT_cond_create(&(cb_dat->sleep_cond));
/* create timer */
margo_create_timer(margo_thread_sleep_cb, cb_dat, timeout_ms);
/* yield thread for specified timeout */
ABT_mutex_lock(cb_dat->sleep_mutex);
ABT_cond_wait(cb_dat->sleep_cond, cb_dat->sleep_mutex);
ABT_mutex_unlock(cb_dat->sleep_mutex);
return;
}
void margo_create_timer(margo_timer_cb_fn cb_fn, void *cb_dat, double timeout_ms)
{
margo_timed_element *el;
el = malloc(sizeof(margo_timed_element));
assert(el);
el->cb_fn = cb_fn;
el->cb_dat = cb_dat;
el->expiration = ABT_get_wtime() + (timeout_ms/1000);
el->prev = el->next = NULL;
margo_queue_timer(el);
return;
}
void margo_check_timers()
{
margo_timed_element *cur;
double now = ABT_get_wtime();
assert(timer_mutex != ABT_MUTEX_NULL);
ABT_mutex_lock(timer_mutex);
/* iterate through timer list, performing timeout action
* for all elements which have passed expiration time
*/
while(timer_head && (timer_head->expiration < now))
{
cur = timer_head;
DL_DELETE(timer_head, cur);
cur->next = cur->prev = NULL;
/* execute callback */
cur->cb_fn(cur->cb_dat);
free(cur);
}
ABT_mutex_unlock(timer_mutex);
return;
}
static void margo_queue_timer(margo_timed_element *el)
{
margo_timed_element *cur;
assert(timer_mutex != ABT_MUTEX_NULL);
ABT_mutex_lock(timer_mutex);
/* if list of timers is empty, put ourselves on it */
if(!timer_head)
{
DL_APPEND(timer_head, el);
}
else
{
/* something else already in queue, keep it sorted in ascending order
* of expiration time
*/
cur = timer_head;
do
{
/* walk backwards through queue */
cur = cur->prev;
/* as soon as we find an element that expires before this one,
* then we add ours after it
*/
if(cur->expiration < el->expiration)
{
DL_APPEND_ELEM(timer_head, cur, el);
break;
}
}while(cur != timer_head);
/* if we never found one with an expiration before this one, then
* this one is the new head
*/
if(el->prev == NULL && el->next == NULL)
DL_PREPEND(timer_head, el);
}
ABT_mutex_unlock(timer_mutex);
return;
}
......@@ -21,20 +21,6 @@
*/
#include <mercury_core.h>
struct timed_element
{
struct timespec expiration;
hg_handle_t handle;
struct timed_element *next;
struct timed_element *prev;
};
#ifdef CLOCK_REALTIME_COARSE
clockid_t clk_id = CLOCK_REALTIME_COARSE;
#else
clockid_t clk_id = CLOCK_REALTIME;
#endif
struct margo_instance
{
/* provided by caller */
......@@ -53,10 +39,6 @@ struct margo_instance
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
/* pending operations with timeouts */
ABT_mutex timer_mutex;
struct timed_element *timer_head;
int table_index;
};
......@@ -97,7 +79,7 @@ margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
ABT_mutex_create(&mid->finalize_mutex);
ABT_cond_create(&mid->finalize_cond);
ABT_mutex_create(&mid->timer_mutex);
margo_timer_init();
mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool;
......@@ -143,6 +125,8 @@ void margo_finalize(margo_instance_id mid)
ABT_cond_broadcast(mid->finalize_cond);
ABT_mutex_unlock(mid->finalize_mutex);
margo_timer_cleanup();
/* TODO: yuck, there is a race here if someone was really waiting for
* finalize; we can't destroy the data structures out from under them.
* We could fix this by reference counting so that the last caller
......@@ -153,7 +137,6 @@ void margo_finalize(margo_instance_id mid)
#if 0
ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond);
ABT_mutex_free(&mid->timer_mutex);
free(mid);
#endif
......@@ -199,8 +182,6 @@ static void hg_progress_fn(void* foo)
unsigned int actual_count;
struct margo_instance *mid = (struct margo_instance *)foo;
size_t size;
struct timespec now;
struct timed_element *cur;
while(!mid->hg_progress_shutdown_flag)
{
......@@ -227,27 +208,8 @@ static void hg_progress_fn(void* foo)
}
}
clock_gettime(clk_id, &now);
ABT_mutex_lock(mid->timer_mutex);
while(mid->timer_head &&
(mid->timer_head->expiration.tv_sec < now.tv_sec ||
(mid->timer_head->expiration.tv_sec == now.tv_sec &&
mid->timer_head->expiration.tv_nsec < now.tv_nsec)))
{
cur = mid->timer_head;
DL_DELETE(mid->timer_head, cur);
cur->next = NULL;
cur->prev = NULL;
//printf("FOO: I would like to cancel operation with tv_sec %ld, tv_nsec %ld\n", (long)cur->expiration.tv_sec, cur->expiration.tv_nsec);
HG_Core_cancel(cur->handle);
}
ABT_mutex_unlock(mid->timer_mutex);
/* TODO: check for timeouts here. If timer_head not null, then check
* current time and compare against first element. Keep walking list
* cancelling operations until we find non-expired element.
*/
/* check for any expired timers */
margo_check_timers();
}
return;
......@@ -279,21 +241,6 @@ hg_return_t margo_forward_timed(
ABT_eventual eventual;
int ret;
hg_return_t* waited_hret;
struct timed_element el;
struct timed_element *cur;
/* calculate expiration time */
el.handle = handle;
el.prev = NULL;
el.next = NULL;
clock_gettime(clk_id, &el.expiration);
el.expiration.tv_sec += timeout_ms/1000;
el.expiration.tv_nsec += fmod(timeout_ms, 1000)*1000.0*1000.0;
if(el.expiration.tv_nsec > 1000000000)
{
el.expiration.tv_nsec -= 1000000000;
el.expiration.tv_sec++;
}
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
......@@ -301,42 +248,7 @@ hg_return_t margo_forward_timed(
return(HG_NOMEM_ERROR);
}
/* TODO: split this out into a subroutine */
/* track timer */
ABT_mutex_lock(mid->timer_mutex);
/* if queue of expiring ops is empty, put ourselves on it */
if(!mid->timer_head)
DL_APPEND(mid->timer_head, &el);
else
{
/* something else already in queue, keep it sorted in ascending order
* of expiration time
*/
cur = mid->timer_head;
do
{
/* walk backwards through queue */
cur = cur->prev;
/* as soon as we find an element that expires before this one,
* then we add ours after it
*/
if(cur->expiration.tv_sec < el.expiration.tv_sec ||
(cur->expiration.tv_sec == el.expiration.tv_sec &&
cur->expiration.tv_nsec < el.expiration.tv_nsec))
{
DL_APPEND_ELEM(mid->timer_head, cur, &el);
break;
}
}while(cur != mid->timer_head);
/* if we never found one with an expiration before this one, then
* this one is the new head
*/
if(el.prev == NULL && el.next == NULL)
DL_PREPEND(mid->timer_head, &el);
}
ABT_mutex_unlock(mid->timer_mutex);
/* TODO: timer interface: add element */
hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
if(hret == 0)
......@@ -345,11 +257,7 @@ hg_return_t margo_forward_timed(
hret = *waited_hret;
}
/* remove timer if it is still in place */
ABT_mutex_lock(mid->timer_mutex);
if(el.prev || el.next)
DL_DELETE(mid->timer_head, &el);
ABT_mutex_unlock(mid->timer_mutex);
/* TODO: remove timer if it is still in place */
ABT_eventual_free(&eventual);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment