Commit a67ab260 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

removed op_id from functions that don't need one and added an internal cache of op_id

parent 86516ff3
......@@ -56,28 +56,20 @@ int main(int argc, char** argv) {
ret = mona_msg_init_unexpected(mona, buf, msg_len);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not initialize message");
na_op_id_t op_id = mona_op_create(mona);
ret = mona_msg_send_unexpected(
mona, buf, msg_len, plugin_data, addr, 0, 0, &op_id);
mona, buf, msg_len, plugin_data, addr, 0, 0);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not send message");
mona_op_destroy(mona, op_id);
ret = mona_addr_free(mona, addr);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not free address");
} else {
MPI_Send(addr_str, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
na_op_id_t op_id = mona_op_create(mona);
ret = mona_msg_recv_unexpected(
mona, buf, msg_len, plugin_data, &op_id);
mona, buf, msg_len, plugin_data);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not receive message");
mona_op_destroy(mona, op_id);
printf("[1] Receiving message from rank 0\n");
for(i = mona_msg_get_unexpected_header_size(mona); i < (int)msg_len; i++) {
......
......@@ -147,8 +147,7 @@ na_return_t mona_msg_send_unexpected(
void *plugin_data,
na_addr_t dest_addr,
na_uint8_t dest_id,
na_tag_t tag,
na_op_id_t *op_id);
na_tag_t tag);
na_return_t mona_msg_isend_unexpected(
mona_instance_t mona,
......@@ -165,8 +164,7 @@ na_return_t mona_msg_recv_unexpected(
mona_instance_t mona,
void *buf,
na_size_t buf_size,
void *plugin_data,
na_op_id_t *op_id);
void *plugin_data);
na_return_t mona_msg_irecv_unexpected(
mona_instance_t mona,
......@@ -188,8 +186,7 @@ na_return_t mona_msg_send_expected(
void *plugin_data,
na_addr_t dest_addr,
na_uint8_t dest_id,
na_tag_t tag,
na_op_id_t *op_id);
na_tag_t tag);
na_return_t mona_msg_isend_expected(
mona_instance_t mona,
......@@ -209,8 +206,7 @@ na_return_t mona_msg_recv_expected(
void *plugin_data,
na_addr_t source_addr,
na_uint8_t source_id,
na_tag_t tag,
na_op_id_t *op_id);
na_tag_t tag);
na_return_t mona_msg_irecv_expected(
mona_instance_t mona,
......@@ -280,8 +276,7 @@ na_return_t mona_put(
na_offset_t remote_offset,
na_size_t data_size,
na_addr_t remote_addr,
na_uint8_t remote_id,
na_op_id_t *op_id);
na_uint8_t remote_id);
na_return_t mona_iput(
mona_instance_t mona,
......@@ -303,8 +298,7 @@ na_return_t mona_get(
na_offset_t remote_offset,
na_size_t data_size,
na_addr_t remote_addr,
na_uint8_t remote_id,
na_op_id_t *op_id);
na_uint8_t remote_id);
na_return_t mona_iget(
mona_instance_t mona,
......
......@@ -5,16 +5,24 @@
*/
#include "mona.h"
typedef struct cached_op_id* cached_op_id_t;
typedef struct cached_op_id {
na_op_id_t op_id;
cached_op_id_t next;
} cached_op_id;
typedef struct mona_instance {
na_class_t* na_class;
na_context_t* na_context;
ABT_pool progress_pool;
ABT_xstream progress_xstream;
ABT_thread progress_thread;
na_bool_t owns_progress_pool;
na_bool_t owns_progress_xstream;
na_bool_t owns_na_class_and_context;
na_bool_t finalize_flag;
na_class_t* na_class;
na_context_t* na_context;
ABT_pool progress_pool;
ABT_xstream progress_xstream;
ABT_thread progress_thread;
na_bool_t owns_progress_pool;
na_bool_t owns_progress_xstream;
na_bool_t owns_na_class_and_context;
na_bool_t finalize_flag;
cached_op_id_t op_id_cache;
ABT_mutex op_id_cache_mtx;
} mona_instance;
typedef struct mona_request {
......@@ -148,7 +156,7 @@ mona_instance_t mona_init_na_pool(
na_context_t *na_context,
ABT_pool progress_pool)
{
int ret;
int ret, i;
mona_instance_t mona = (mona_instance_t)calloc(1, sizeof(*mona));
if(!mona) return MONA_INSTANCE_NULL;
mona->na_class = na_class;
......@@ -156,6 +164,19 @@ mona_instance_t mona_init_na_pool(
mona->progress_pool = progress_pool;
mona->progress_xstream = ABT_XSTREAM_NULL;
mona->progress_thread = ABT_THREAD_NULL;
mona->op_id_cache_mtx = ABT_MUTEX_NULL;
ret = ABT_mutex_create(&(mona->op_id_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
mona->op_id_cache = (cached_op_id_t)calloc(1, sizeof(*(mona->op_id_cache)));
mona->op_id_cache->op_id = NA_Op_create(na_class);
cached_op_id_t current = mona->op_id_cache;
for(i=0; i < 15; i++) {
current->next = (cached_op_id_t)calloc(1, sizeof(*current));
current = current->next;
current->op_id = NA_Op_create(na_class);
}
ret = ABT_thread_create(mona->progress_pool, mona_progress_loop,
(void*)mona, ABT_THREAD_ATTR_NULL, &(mona->progress_thread));
......@@ -165,6 +186,8 @@ finish:
return mona;
error:
if(mona->op_id_cache_mtx != ABT_MUTEX_NULL)
ABT_mutex_free(&(mona->op_id_cache_mtx));
free(mona);
mona = MONA_INSTANCE_NULL;
goto finish;
......@@ -181,6 +204,17 @@ na_return_t mona_finalize(mona_instance_t mona)
}
if(mona->owns_progress_pool)
ABT_pool_free(&(mona->progress_pool));
cached_op_id_t cached = mona->op_id_cache;
mona->op_id_cache = NULL;
while(cached) {
cached_op_id_t tmp = cached->next;
NA_Op_destroy(mona->na_class, cached->op_id);
free(cached);
cached = tmp;
}
ABT_mutex_free(&(mona->op_id_cache_mtx));
if(mona->owns_na_class_and_context) {
NA_Context_destroy(
mona->na_class,
......@@ -334,6 +368,32 @@ na_return_t mona_op_destroy(
return NA_Op_destroy(mona->na_class, op_id);
}
static cached_op_id_t get_op_id_from_cache(mona_instance_t mona)
{
cached_op_id_t id;
ABT_mutex_lock(mona->op_id_cache_mtx);
if(mona->op_id_cache) {
id = mona->op_id_cache;
mona->op_id_cache = id->next;
id->next = NULL;
} else {
na_op_id_t op_id = NA_Op_create(mona->na_class);
id = (cached_op_id_t)calloc(1, sizeof(*id));
id->op_id = op_id;
}
ABT_mutex_unlock(mona->op_id_cache_mtx);
return id;
}
static void return_op_id_to_cache(mona_instance_t mona, cached_op_id_t id)
{
ABT_mutex_lock(mona->op_id_cache_mtx);
cached_op_id_t head = mona->op_id_cache;
id->next = head;
mona->op_id_cache = id;
ABT_mutex_unlock(mona->op_id_cache_mtx);
}
void* mona_msg_buf_alloc(
mona_instance_t mona,
na_size_t buf_size,
......@@ -424,16 +484,18 @@ na_return_t mona_msg_send_unexpected(
void *plugin_data,
na_addr_t dest_addr,
na_uint8_t dest_id,
na_tag_t tag,
na_op_id_t *op_id)
na_tag_t tag)
{
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
mona_request req = { ABT_EVENTUAL_NULL };
na_return_t na_ret = mona_msg_isend_unexpected_internal(
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, op_id, &req);
if(na_ret != NA_SUCCESS) {
return na_ret;
}
return mona_wait_internal(&req);
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_msg_isend_unexpected(
......@@ -486,15 +548,18 @@ na_return_t mona_msg_recv_unexpected(
mona_instance_t mona,
void *buf,
na_size_t buf_size,
void *plugin_data,
na_op_id_t *op_id)
void *plugin_data)
{
mona_request req = { ABT_EVENTUAL_NULL };
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_irecv_unexpected_internal(
mona, buf, buf_size, plugin_data, op_id, &req);
if(na_ret != NA_SUCCESS)
return na_ret;
return mona_wait_internal(&req);
mona, buf, buf_size, plugin_data, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_msg_irecv_unexpected(
......@@ -558,15 +623,18 @@ na_return_t mona_msg_send_expected(
void *plugin_data,
na_addr_t dest_addr,
na_uint8_t dest_id,
na_tag_t tag,
na_op_id_t *op_id)
na_tag_t tag)
{
mona_request req = { ABT_EVENTUAL_NULL };
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_isend_expected_internal(
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, op_id, &req);
if(na_ret != NA_SUCCESS)
return na_ret;
return mona_wait_internal(&req);
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_msg_isend_expected(
......@@ -625,15 +693,18 @@ na_return_t mona_msg_recv_expected(
void *plugin_data,
na_addr_t source_addr,
na_uint8_t source_id,
na_tag_t tag,
na_op_id_t *op_id)
na_tag_t tag)
{
mona_request req = { ABT_EVENTUAL_NULL };
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_irecv_expected_internal(
mona, buf, buf_size, plugin_data, source_addr, source_id, tag, op_id, &req);
if(na_ret != NA_SUCCESS)
return na_ret;
return mona_wait_internal(&req);
mona, buf, buf_size, plugin_data, source_addr, source_id, tag, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_msg_irecv_expected(
......@@ -786,17 +857,20 @@ na_return_t mona_put(
na_offset_t remote_offset,
na_size_t data_size,
na_addr_t remote_addr,
na_uint8_t remote_id,
na_op_id_t *op_id)
na_uint8_t remote_id)
{
mona_request req = { ABT_EVENTUAL_NULL };
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_iput_internal(
mona, local_mem_handle, local_offset,
remote_mem_handle, remote_offset,
data_size, remote_addr, remote_id, op_id, &req);
if(na_ret != NA_SUCCESS)
return na_ret;
return mona_wait_internal(&req);
data_size, remote_addr, remote_id, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_iput(
......@@ -861,19 +935,22 @@ na_return_t mona_get(
na_offset_t remote_offset,
na_size_t data_size,
na_addr_t remote_addr,
na_uint8_t remote_id,
na_op_id_t *op_id)
na_uint8_t remote_id)
{
mona_request req = { ABT_EVENTUAL_NULL };
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_iget_internal(
mona, local_mem_handle,
local_offset, remote_mem_handle,
remote_offset, data_size,
remote_addr, remote_id,
op_id, &req);
if(na_ret != NA_SUCCESS)
return na_ret;
return mona_wait_internal(&req);
&op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
return_op_id_to_cache(mona, id);
return na_ret;
}
na_return_t mona_iget(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment