Commit 9c73b20d authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

changed arguments of recv functions to be able to retrieve the tag, source address, and size

parent a67ab260
......@@ -57,7 +57,7 @@ int main(int argc, char** argv) {
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not initialize message");
ret = mona_msg_send_unexpected(
mona, buf, msg_len, plugin_data, addr, 0, 0);
mona, buf, msg_len, plugin_data, addr, 0, 1234);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not send message");
ret = mona_addr_free(mona, addr);
......@@ -65,12 +65,20 @@ int main(int argc, char** argv) {
} else {
MPI_Send(addr_str, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
na_addr_t source_addr = NA_ADDR_NULL;
na_tag_t tag = 0;
na_size_t actual_size = 0;
ret = mona_msg_recv_unexpected(
mona, buf, msg_len, plugin_data);
mona, buf, msg_len, plugin_data,
&source_addr, &tag, &actual_size);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not receive message");
printf("[1] Receiving message from rank 0\n");
printf("[1] Receiving message from rank 0 with tag %d and size %ld\n", tag, actual_size);
ret = mona_addr_free(mona, source_addr);
ASSERT_MESSAGE(ret == NA_SUCCESS, "Could not free source address");
for(i = mona_msg_get_unexpected_header_size(mona); i < (int)msg_len; i++) {
ASSERT_MESSAGE(buf[i] == i % 32, "Incorrect byte received");
......
......@@ -164,13 +164,19 @@ na_return_t mona_msg_recv_unexpected(
mona_instance_t mona,
void *buf,
na_size_t buf_size,
void *plugin_data);
void *plugin_data,
na_addr_t* source_addr,
na_tag_t* tag,
na_size_t* size);
na_return_t mona_msg_irecv_unexpected(
mona_instance_t mona,
void *buf,
na_size_t buf_size,
void *plugin_data,
na_addr_t* source_addr,
na_tag_t* tag,
na_size_t* size,
na_op_id_t *op_id,
mona_request_t* req);
......
......@@ -26,9 +26,15 @@ typedef struct mona_instance {
} mona_instance;
typedef struct mona_request {
ABT_eventual eventual;
ABT_eventual eventual;
mona_instance_t mona;
na_addr_t* source_addr;
na_tag_t* tag;
na_size_t* size;
} mona_request;
#define MONA_REQUEST_INITIALIZER { ABT_EVENTUAL_NULL, NULL, NULL, NULL, NULL }
static void mona_progress_loop(void* uarg) {
mona_instance_t mona = (mona_instance_t)uarg;
na_return_t trigger_ret, na_ret;
......@@ -446,6 +452,20 @@ static int mona_callback(const struct na_cb_info *info)
{
na_return_t na_ret = info->ret;
mona_request_t req = (mona_request_t)(info->arg);
if(info->type == NA_CB_RECV_UNEXPECTED) {
na_addr_t source = info->info.recv_unexpected.source;
na_tag_t tag = info->info.recv_unexpected.tag;
na_size_t size = info->info.recv_unexpected.actual_buf_size;
if(req->source_addr) {
mona_addr_dup(req->mona, source, req->source_addr);
}
if(req->tag) {
*(req->tag) = tag;
}
if(req->size) {
*(req->size) = size;
}
}
ABT_eventual_set(req->eventual, &na_ret, sizeof(na_ret));
return NA_SUCCESS;
}
......@@ -488,7 +508,7 @@ na_return_t mona_msg_send_unexpected(
{
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
na_return_t na_ret = mona_msg_isend_unexpected_internal(
mona, buf, buf_size, plugin_data, dest_addr, dest_id, tag, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
......@@ -525,6 +545,9 @@ static na_return_t mona_msg_irecv_unexpected_internal(
void *buf,
na_size_t buf_size,
void *plugin_data,
na_addr_t* source_addr,
na_tag_t* tag,
na_size_t* size,
na_op_id_t *op_id,
mona_request_t req)
{
......@@ -536,7 +559,12 @@ static na_return_t mona_msg_irecv_unexpected_internal(
if(ret != 0)
return NA_NOMEM;
req->eventual = eventual;
req->eventual = eventual;
req->mona = mona;
req->source_addr = source_addr;
req->tag = tag;
req->size = size;
return NA_Msg_recv_unexpected(
mona->na_class, mona->na_context,
mona_callback, (void*)req,
......@@ -548,13 +576,17 @@ na_return_t mona_msg_recv_unexpected(
mona_instance_t mona,
void *buf,
na_size_t buf_size,
void *plugin_data)
void *plugin_data,
na_addr_t* source_addr,
na_tag_t* tag,
na_size_t* size)
{
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_irecv_unexpected_internal(
mona, buf, buf_size, plugin_data, &op_id, &req);
mona, buf, buf_size, plugin_data,
source_addr, tag, size, &op_id, &req);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_wait_internal(&req);
finish:
......@@ -567,13 +599,17 @@ na_return_t mona_msg_irecv_unexpected(
void *buf,
na_size_t buf_size,
void *plugin_data,
na_addr_t* source_addr,
na_tag_t* tag,
na_size_t* size,
na_op_id_t *op_id,
mona_request_t* req)
{
mona_request_t tmp_req = calloc(1, sizeof(*tmp_req));
tmp_req->eventual = ABT_EVENTUAL_NULL;
na_return_t na_ret = mona_msg_irecv_unexpected_internal(
mona, buf, buf_size, plugin_data, op_id, tmp_req);
mona, buf, buf_size, plugin_data,
source_addr, tag, size, op_id, tmp_req);
if(na_ret != NA_SUCCESS)
free(tmp_req);
else
......@@ -608,7 +644,12 @@ static na_return_t mona_msg_isend_expected_internal(
if(ret != 0)
return NA_NOMEM;
req->eventual = eventual;
req->eventual = eventual;
req->mona = mona;
req->source_addr = NULL;
req->tag = NULL;
req->size = NULL;
return NA_Msg_send_expected(
mona->na_class, mona->na_context,
mona_callback, (void*)req,
......@@ -625,7 +666,7 @@ na_return_t mona_msg_send_expected(
na_uint8_t dest_id,
na_tag_t tag)
{
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_isend_expected_internal(
......@@ -678,7 +719,12 @@ static na_return_t mona_msg_irecv_expected_internal(
if(ret != 0)
return NA_NOMEM;
req->eventual = eventual;
req->eventual = eventual;
req->mona = mona;
req->source_addr = NULL;
req->tag = NULL;
req->size = NULL;
return NA_Msg_recv_expected(
mona->na_class, mona->na_context,
mona_callback, (void*)req,
......@@ -695,7 +741,7 @@ na_return_t mona_msg_recv_expected(
na_uint8_t source_id,
na_tag_t tag)
{
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_msg_irecv_expected_internal(
......@@ -840,7 +886,12 @@ static na_return_t mona_iput_internal(
if(ret != 0)
return NA_NOMEM;
req->eventual = eventual;
req->eventual = eventual;
req->mona = mona;
req->source_addr = NULL;
req->tag = NULL;
req->size = NULL;
return NA_Put(mona->na_class, mona->na_context,
mona_callback, (void*)req,
local_mem_handle, local_offset,
......@@ -859,7 +910,7 @@ na_return_t mona_put(
na_addr_t remote_addr,
na_uint8_t remote_id)
{
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_iput_internal(
......@@ -918,7 +969,12 @@ static na_return_t mona_iget_internal(
if(ret != 0)
return NA_NOMEM;
req->eventual = eventual;
req->eventual = eventual;
req->mona = mona;
req->source_addr = NULL;
req->tag = NULL;
req->size = NULL;
return NA_Get(mona->na_class, mona->na_context,
mona_callback, (void*)req,
local_mem_handle, local_offset,
......@@ -937,7 +993,7 @@ na_return_t mona_get(
na_addr_t remote_addr,
na_uint8_t remote_id)
{
mona_request req = { ABT_EVENTUAL_NULL };
mona_request req = MONA_REQUEST_INITIALIZER;
cached_op_id_t id = get_op_id_from_cache(mona);
na_op_id_t op_id = id->op_id;
na_return_t na_ret = mona_iget_internal(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment