Commit 03300054 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added isend/irecv and corresponding tests

parent 9ec9f756
......@@ -553,6 +553,30 @@ finish:
return na_ret;
}
struct isend_args {
mona_instance_t mona;
const void* buf;
na_size_t buf_size;
na_addr_t dest;
na_uint8_t dest_id;
na_tag_t tag;
mona_request_t req;
};
static void isend_thread(void* x)
{
struct isend_args* args = (struct isend_args*)x;
na_return_t na_ret = mona_send(
args->mona,
args->buf,
args->buf_size,
args->dest,
args->dest_id,
args->tag);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_isend(
mona_instance_t mona,
const void *buf,
......@@ -562,7 +586,32 @@ na_return_t mona_isend(
na_tag_t tag,
mona_request_t* req)
{
// TODO
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct isend_args* args = (struct isend_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->buf_size = buf_size;
args->dest = dest;
args->dest_id = dest_id;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = eventual;
args->req = tmp_req;
ret = ABT_thread_create(mona->progress_pool, isend_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
na_return_t mona_recv(
......@@ -648,6 +697,30 @@ finish:
return na_ret;
}
struct irecv_args {
mona_instance_t mona;
void* buf;
na_size_t buf_size;
na_addr_t src;
na_tag_t tag;
na_size_t* actual_size;
mona_request_t req;
};
static void irecv_thread(void* x)
{
struct irecv_args* args = (struct irecv_args*)x;
na_return_t na_ret = mona_recv(
args->mona,
args->buf,
args->buf_size,
args->src,
args->tag,
args->actual_size);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_irecv(
mona_instance_t mona,
void* buf,
......@@ -657,7 +730,32 @@ na_return_t mona_irecv(
na_size_t* actual_size,
mona_request_t* req)
{
// TODO
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct irecv_args* args = (struct irecv_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->buf_size = buf_size;
args->src = src;
args->actual_size = actual_size;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
args->req = tmp_req;
tmp_req->eventual = eventual;
ret = ABT_thread_create(mona->progress_pool, irecv_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
// ------------------------------------------------------------------------------------
......
......@@ -10,6 +10,12 @@ target_include_directories (test-send-recv PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_link_libraries (test-send-recv mona)
add_executable (test-isend-irecv test-isend-irecv.c munit/munit.c)
target_include_directories (test-isend-irecv PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/munit
${CMAKE_CURRENT_SOURCE_DIR}/../include)
target_link_libraries (test-isend-irecv mona)
add_executable (test-send-recv-unexpected test-send-recv-unexpected.c munit/munit.c)
target_include_directories (test-send-recv-unexpected PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/munit
......@@ -37,6 +43,7 @@ target_link_libraries (test-na mona)
add_test (NAME TestInit COMMAND ./test-init)
add_test (NAME TestSendRecv COMMAND mpirun -np 2 ./test-send-recv)
add_test (NAME TestISendIRecv COMMAND mpirun -np 2 ./test-isend-irecv)
add_test (NAME TestSendRecvUnexpected COMMAND mpirun -np 2 ./test-send-recv-unexpected)
add_test (NAME TestSendRecvExpected COMMAND mpirun -np 2 ./test-send-recv-expected)
add_test (NAME TestPutGet COMMAND mpirun -np 2 ./test-put-get)
......
#include <mpi.h>
#include "munit/munit.h"
#include "mona.h"
typedef struct {
mona_instance_t mona;
int rank;
na_addr_t self_addr;
na_addr_t other_addr;
} test_context;
static void* test_context_setup(const MunitParameter params[], void* user_data)
{
(void)params;
(void)user_data;
int ret;
MPI_Init(NULL, NULL);
ABT_init(0, NULL);
mona_instance_t mona = mona_init("ofi+tcp", NA_TRUE, NULL);
test_context* context = (test_context*)calloc(1, sizeof(*context));
context->mona = mona;
MPI_Comm_rank(MPI_COMM_WORLD, &(context->rank));
ret = mona_addr_self(mona, &(context->self_addr));
munit_assert_int(ret, ==, NA_SUCCESS);
char self_addr_str[128];
na_size_t self_addr_size = 128;
ret = mona_addr_to_string(mona, self_addr_str, &self_addr_size, context->self_addr);
munit_assert_int(ret, ==, NA_SUCCESS);
char other_addr_str[128];
MPI_Sendrecv(self_addr_str, 128, MPI_BYTE, (context->rank + 1) % 2, 0,
other_addr_str, 128, MPI_BYTE, (context->rank + 1) % 2, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
ret = mona_addr_lookup(mona, other_addr_str, &(context->other_addr));
munit_assert_int(ret, ==, NA_SUCCESS);
return context;
}
static void test_context_tear_down(void* fixture)
{
MPI_Barrier(MPI_COMM_WORLD);
test_context* context = (test_context*)fixture;
mona_addr_free(context->mona, context->self_addr);
mona_addr_free(context->mona, context->other_addr);
mona_finalize(context->mona);
free(context);
ABT_finalize();
MPI_Finalize();
}
static MunitResult test_isend_irecv(const MunitParameter params[], void* data)
{
(void)params;
test_context* context = (test_context*)data;
na_return_t ret;
mona_instance_t mona = context->mona;
char* buf = malloc(8192);
na_size_t msg_len = 8192;
if(context->rank == 0) { // sender
int i;
for(i = 0; i < (int)msg_len; i++) {
buf[i] = i % 32;
}
mona_request_t req;
ret = mona_isend(mona, buf, msg_len, context->other_addr, 0, 1234, &req);
munit_assert_int(ret, ==, NA_SUCCESS);
ret = mona_wait(req);
munit_assert_int(ret, ==, NA_SUCCESS);
na_size_t recv_size;
ret = mona_irecv(mona, buf, 64, context->other_addr, 1234, &recv_size, &req);
munit_assert_int(ret, ==, NA_SUCCESS);
ret = mona_wait(req);
munit_assert_int(ret, ==, NA_SUCCESS);
for(i = 0; i < 64; i++) {
munit_assert_int(buf[i], ==, (i+1) % 32);
}
} else { // receiver
int i;
mona_request_t req;
na_size_t recv_size;
ret = mona_irecv(mona, buf, msg_len, context->other_addr, 1234, &recv_size, &req);
munit_assert_int(ret, ==, NA_SUCCESS);
ret = mona_wait(req);
munit_assert_int(ret, ==, NA_SUCCESS);
for(i = 0; i < (int)msg_len; i++) {
munit_assert_int(buf[i], ==, i % 32);
}
for(i=0; i < 64; i++) {
buf[i] = (i+1) % 32;
}
ret = mona_isend(mona, buf, 64, context->other_addr, 0, 1234, &req);
munit_assert_int(ret, ==, NA_SUCCESS);
ret = mona_wait(req);
munit_assert_int(ret, ==, NA_SUCCESS);
}
return MUNIT_OK;
}
static MunitTest test_suite_tests[] = {
{ (char*) "/hl", test_isend_irecv, test_context_setup, test_context_tear_down, MUNIT_TEST_OPTION_NONE, NULL },
{ NULL, NULL, NULL, NULL, MUNIT_TEST_OPTION_NONE, NULL }
};
static const MunitSuite test_suite = {
(char*) "/mona/isend-irecv", test_suite_tests, NULL, 1, MUNIT_SUITE_OPTION_NONE
};
int main(int argc, char* argv[MUNIT_ARRAY_PARAM(argc + 1)]) {
return munit_suite_main(&test_suite, (void*) "mona", argc, argv);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment