Commit 50576b4a authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added a cache for requests

parent 45f315d1
......@@ -70,7 +70,6 @@ int main(int argc, char** argv) {
na_tag_t tag = 0;
na_size_t actual_size = 0;
sleep(1);
ret = mona_msg_recv_unexpected(
mona, buf, msg_len, plugin_data,
&source_addr, &tag, &actual_size);
......
......@@ -23,6 +23,8 @@ typedef struct mona_instance {
na_bool_t finalize_flag;
cached_op_id_t op_id_cache;
ABT_mutex op_id_cache_mtx;
mona_request_t req_cache;
ABT_mutex req_cache_mtx;
} mona_instance;
typedef struct mona_request {
......@@ -31,9 +33,10 @@ typedef struct mona_request {
na_addr_t* source_addr;
na_tag_t* tag;
na_size_t* size;
mona_request_t next; // for the request cache
} mona_request;
#define MONA_REQUEST_INITIALIZER { ABT_EVENTUAL_NULL, NULL, NULL, NULL, NULL }
#define MONA_REQUEST_INITIALIZER { ABT_EVENTUAL_NULL, NULL, NULL, NULL, NULL, NULL }
static void mona_progress_loop(void* uarg) {
mona_instance_t mona = (mona_instance_t)uarg;
......@@ -165,14 +168,17 @@ mona_instance_t mona_init_na_pool(
int ret, i;
mona_instance_t mona = (mona_instance_t)calloc(1, sizeof(*mona));
if(!mona) return MONA_INSTANCE_NULL;
mona->na_class = na_class;
mona->na_context = na_context;
mona->progress_pool = progress_pool;
mona->na_class = na_class;
mona->na_context = na_context;
mona->progress_pool = progress_pool;
mona->progress_xstream = ABT_XSTREAM_NULL;
mona->progress_thread = ABT_THREAD_NULL;
mona->op_id_cache_mtx = ABT_MUTEX_NULL;
mona->progress_thread = ABT_THREAD_NULL;
mona->op_id_cache_mtx = ABT_MUTEX_NULL;
mona->req_cache_mtx = ABT_MUTEX_NULL;
ret = ABT_mutex_create(&(mona->op_id_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
ret = ABT_mutex_create(&(mona->req_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
mona->op_id_cache = (cached_op_id_t)calloc(1, sizeof(*(mona->op_id_cache)));
mona->op_id_cache->op_id = NA_Op_create(na_class);
......@@ -194,6 +200,8 @@ finish:
error:
if(mona->op_id_cache_mtx != ABT_MUTEX_NULL)
ABT_mutex_free(&(mona->op_id_cache_mtx));
if(mona->req_cache_mtx != ABT_MUTEX_NULL)
ABT_mutex_free(&(mona->req_cache_mtx));
free(mona);
mona = MONA_INSTANCE_NULL;
goto finish;
......@@ -211,16 +219,25 @@ na_return_t mona_finalize(mona_instance_t mona)
if(mona->owns_progress_pool)
ABT_pool_free(&(mona->progress_pool));
cached_op_id_t cached = mona->op_id_cache;
cached_op_id_t cached_op = mona->op_id_cache;
mona->op_id_cache = NULL;
while(cached) {
cached_op_id_t tmp = cached->next;
NA_Op_destroy(mona->na_class, cached->op_id);
free(cached);
cached = tmp;
while(cached_op) {
cached_op_id_t tmp = cached_op->next;
NA_Op_destroy(mona->na_class, cached_op->op_id);
free(cached_op);
cached_op = tmp;
}
ABT_mutex_free(&(mona->op_id_cache_mtx));
mona_request_t cached_req = mona->req_cache;
mona->req_cache = NULL;
while(cached_req) {
mona_request_t tmp = cached_req->next;
free(cached_req);
cached_req = tmp;
}
ABT_mutex_free(&(mona->req_cache_mtx));
if(mona->owns_na_class_and_context) {
NA_Context_destroy(
mona->na_class,
......@@ -400,6 +417,30 @@ static void return_op_id_to_cache(mona_instance_t mona, cached_op_id_t id)
ABT_mutex_unlock(mona->op_id_cache_mtx);
}
static void return_req_to_cache(mona_instance_t mona, mona_request_t req)
{
ABT_mutex_lock(mona->req_cache_mtx);
mona_request_t head = mona->req_cache;
req->next = head;
mona->req_cache = req;
ABT_mutex_unlock(mona->req_cache_mtx);
}
static mona_request_t get_req_from_cache(mona_instance_t mona)
{
mona_request_t req;
ABT_mutex_lock(mona->req_cache_mtx);
if(mona->req_cache) {
req = mona->req_cache;
mona->req_cache = req->next;
req->next = NULL;
} else {
req = (mona_request_t)calloc(1, sizeof(*req));
}
ABT_mutex_unlock(mona->req_cache_mtx);
return req;
}
void* mona_msg_buf_alloc(
mona_instance_t mona,
na_size_t buf_size,
......@@ -530,14 +571,14 @@ na_return_t mona_msg_isend_unexpected(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->eventual = ABT_EVENTUAL_NULL;
mona_request_t tmp_req = get_req_from_cache(mona);
na_return_t na_ret = mona_msg_isend_unexpected_internal(
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......@@ -606,15 +647,15 @@ na_return_t mona_msg_irecv_unexpected(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->eventual = ABT_EVENTUAL_NULL;
mona_request_t tmp_req = get_req_from_cache(mona);
na_return_t na_ret = mona_msg_irecv_unexpected_internal(
mona, buf, buf_size, plugin_data,
source_addr, tag, size, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......@@ -690,14 +731,14 @@ na_return_t mona_msg_isend_expected(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->eventual = ABT_EVENTUAL_NULL;
mona_request_t tmp_req = get_req_from_cache(mona);
na_return_t na_ret = mona_msg_isend_expected_internal(
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......@@ -765,14 +806,14 @@ na_return_t mona_msg_irecv_expected(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->eventual = ABT_EVENTUAL_NULL;
mona_request_t tmp_req = get_req_from_cache(mona);
na_return_t na_ret = mona_msg_irecv_expected_internal(
mona, buf, buf_size, plugin_data, source_addr, source_id, tag, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
return na_ret;
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......@@ -937,16 +978,17 @@ na_return_t mona_iput(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = ABT_EVENTUAL_NULL;
na_return_t na_ret = mona_iput_internal(
mona, local_mem_handle, local_offset,
remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......@@ -1022,7 +1064,7 @@ na_return_t mona_iget(
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = ABT_EVENTUAL_NULL;
na_return_t na_ret = mona_iget_internal(
mona, local_mem_handle,
......@@ -1030,10 +1072,11 @@ na_return_t mona_iget(
remote_offset, data_size,
remote_addr, remote_id,
op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
if(na_ret != NA_SUCCESS) {
return_req_to_cache(mona, tmp_req);
} else {
*req = tmp_req;
}
return na_ret;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment