Commit 9ec9f756 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added high-level send/recv and associated tests

parent 11d28a65
......@@ -286,6 +286,109 @@ na_return_t mona_addr_deserialize(
const void *buf,
na_size_t buf_size);
/**
* @brief High-level blocking send function. This function will
* appropriatly use unexpected messages or combinations of unexpected,
* expected, and RDMA message depending on data size.
*
* Note: using a high-level function in conjunction with low-level
* (mona_msg_*) functions may lead to undefined behaviors and should
* be avoided.
*
* @param mona [IN/OUT] Mona instance
* @param buf [IN] data to send
* @param buf_size [IN] buffer size
* @param dest [IN] destination address
* @param dest_id [IN] destination context id
* @param tag [IN] tag
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_send(
mona_instance_t mona,
const void *buf,
na_size_t buf_size,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag);
/**
* @brief Non-blocking version of mona_send. The resulting mona_request_t
* can be used in mona_wait() and mona_test.
*
* Note: using a high-level function in conjunction with low-level
* (mona_msg_*) functions may lead to undefined behaviors and should
* be avoided.
*
* @param mona [IN/OUT] Mona instance
* @param buf [IN] data to send
* @param buf_size [IN] buffer size
* @param dest [IN] destination address
* @param dest_id [IN] destination context id
* @param tag [IN] tag
* @param req [OUT] request
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_isend(
mona_instance_t mona,
const void *buf,
na_size_t buf_size,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
mona_request_t* req);
/**
* @brief High-level blocking recv function. This function will
* appropriatly use unexpected messages or combinations of unexpected,
* expected, and RDMA message depending on data size, to match a
* call to mona_send or mona_isend from the source.
*
* Note: using a high-level function in conjunction with low-level
* (mona_msg_*) functions may lead to undefined behaviors and should
* be avoided.
*
* @param mona [IN/OUT] Mona instance
* @param buf [OUT] buffer in which to place the received data
* @param buf_size [IN] buffer size
* @param dest [IN] source address
* @param tag [IN] tag
* @param actual_size [OUT] actual received size
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_recv(
mona_instance_t mona,
void* buf,
na_size_t buf_size,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size);
/**
* @brief Non-blocking equivalent of mona_recv. The resulting mona_request_t
* can be used in mona_wait() and mona_test.
*
* @param mona [IN/OUT] Mona instance
* @param buf [OUT] buffer in which to place the received data
* @param buf_size [IN] buffer size
* @param dest [IN] source address
* @param tag [IN] tag
* @param actual_size [OUT] actual received size
* @param req [OUT] request
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_irecv(
mona_instance_t mona,
void* buf,
na_size_t buf_size,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
mona_request_t* req);
/**
* Get the maximum size of messages supported by unexpected send/recv.
* Small message size.
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MONA_TYPES_H
#define __MONA_TYPES_H
#include "mona.h"
#include <stdlib.h>
typedef struct cached_op_id* cached_op_id_t;
typedef struct cached_op_id {
na_op_id_t op_id;
cached_op_id_t next;
} cached_op_id;
typedef struct cached_msg* cached_msg_t;
typedef struct cached_msg {
char* buffer;
void* plugin_data;
cached_msg_t next;
} cached_msg;
typedef struct mona_instance {
// NA structures
na_class_t* na_class;
na_context_t* na_context;
// ABT structures
ABT_pool progress_pool;
ABT_xstream progress_xstream;
ABT_thread progress_thread;
// ownership information
na_bool_t owns_progress_pool;
na_bool_t owns_progress_xstream;
na_bool_t owns_na_class_and_context;
// finalization
na_bool_t finalize_flag;
// operation id cache
cached_op_id_t op_id_cache;
ABT_mutex op_id_cache_mtx;
// request cache
mona_request_t req_cache;
ABT_mutex req_cache_mtx;
// message cache for high-level functions
cached_msg_t msg_cache;
ABT_mutex msg_cache_mtx;
} mona_instance;
typedef struct mona_request {
ABT_eventual eventual;
mona_instance_t mona;
na_addr_t* source_addr;
na_tag_t* tag;
na_size_t* size;
mona_request_t next; // for the request cache
} mona_request;
#define MONA_REQUEST_INITIALIZER { ABT_EVENTUAL_NULL, NULL, NULL, NULL, NULL, NULL }
typedef enum hl_msg_type {
HL_MSG_SMALL,
HL_MSG_LARGE
} hl_msg_type;
// Operation ID cache -------------------------------------------------------
static inline cached_op_id_t get_op_id_from_cache(mona_instance_t mona)
{
cached_op_id_t id;
ABT_mutex_lock(mona->op_id_cache_mtx);
if(mona->op_id_cache) {
id = mona->op_id_cache;
mona->op_id_cache = id->next;
id->next = NULL;
} else {
na_op_id_t op_id = NA_Op_create(mona->na_class);
id = (cached_op_id_t)calloc(1, sizeof(*id));
id->op_id = op_id;
}
ABT_mutex_unlock(mona->op_id_cache_mtx);
return id;
}
static inline void return_op_id_to_cache(mona_instance_t mona, cached_op_id_t id)
{
ABT_mutex_lock(mona->op_id_cache_mtx);
cached_op_id_t head = mona->op_id_cache;
id->next = head;
mona->op_id_cache = id;
ABT_mutex_unlock(mona->op_id_cache_mtx);
}
static inline void clear_op_id_cache(mona_instance_t mona)
{
ABT_mutex_lock(mona->op_id_cache_mtx);
cached_op_id_t cached_op = mona->op_id_cache;
mona->op_id_cache = NULL;
while(cached_op) {
cached_op_id_t tmp = cached_op->next;
NA_Op_destroy(mona->na_class, cached_op->op_id);
free(cached_op);
cached_op = tmp;
}
ABT_mutex_unlock(mona->op_id_cache_mtx);
}
// Request cache ----------------------------------------------------------
static inline mona_request_t get_req_from_cache(mona_instance_t mona)
{
mona_request_t req;
ABT_mutex_lock(mona->req_cache_mtx);
if(mona->req_cache) {
req = mona->req_cache;
mona->req_cache = req->next;
req->next = NULL;
} else {
req = (mona_request_t)calloc(1, sizeof(*req));
}
ABT_mutex_unlock(mona->req_cache_mtx);
return req;
}
static inline void return_req_to_cache(mona_instance_t mona, mona_request_t req)
{
ABT_mutex_lock(mona->req_cache_mtx);
mona_request_t head = mona->req_cache;
req->next = head;
mona->req_cache = req;
ABT_mutex_unlock(mona->req_cache_mtx);
}
static inline void clear_req_cache(mona_instance_t mona)
{
ABT_mutex_lock(mona->req_cache_mtx);
mona_request_t cached_req = mona->req_cache;
mona->req_cache = NULL;
while(cached_req) {
mona_request_t tmp = cached_req->next;
free(cached_req);
cached_req = tmp;
}
ABT_mutex_unlock(mona->req_cache_mtx);
}
// Message cache -------------------------------------------------------
static inline cached_msg_t get_msg_from_cache(mona_instance_t mona)
{
cached_msg_t msg;
ABT_mutex_lock(mona->msg_cache_mtx);
if(mona->msg_cache) {
msg = mona->msg_cache;
mona->msg_cache = msg->next;
msg->next = NULL;
} else {
msg = (cached_msg_t)calloc(1, sizeof(*msg));
msg->buffer = (char*)mona_msg_buf_alloc(mona,
mona_msg_get_max_unexpected_size(mona),
&(msg->plugin_data));
}
ABT_mutex_unlock(mona->msg_cache_mtx);
return msg;
}
static inline void return_msg_to_cache(mona_instance_t mona, cached_msg_t msg)
{
ABT_mutex_lock(mona->msg_cache_mtx);
cached_msg_t head = mona->msg_cache;
msg->next = head;
mona->msg_cache = msg;
ABT_mutex_unlock(mona->msg_cache_mtx);
}
static inline void clear_msg_cache(mona_instance_t mona)
{
ABT_mutex_lock(mona->msg_cache_mtx);
cached_msg_t msg = mona->msg_cache;
mona->msg_cache = NULL;
while(msg) {
cached_msg_t tmp = msg->next;
mona_msg_buf_free(mona, msg->buffer, msg->plugin_data);
free(msg);
msg = tmp;
}
ABT_mutex_unlock(mona->msg_cache_mtx);
}
// Wait --------------------------------------------------------------
static inline na_return_t mona_wait_internal(mona_request_t req)
{
na_return_t* waited_na_ret = NULL;
na_return_t na_ret = NA_SUCCESS;
ABT_eventual_wait(req->eventual, (void**)&waited_na_ret);
na_ret = *waited_na_ret;
ABT_eventual_free(&(req->eventual));
return na_ret;
}
#endif
......@@ -3,41 +3,12 @@
*
* See COPYRIGHT in top-level directory.
*/
#include "mona.h"
#include <stdlib.h>
typedef struct cached_op_id* cached_op_id_t;
typedef struct cached_op_id {
na_op_id_t op_id;
cached_op_id_t next;
} cached_op_id;
typedef struct mona_instance {
na_class_t* na_class;
na_context_t* na_context;
ABT_pool progress_pool;
ABT_xstream progress_xstream;
ABT_thread progress_thread;
na_bool_t owns_progress_pool;
na_bool_t owns_progress_xstream;
na_bool_t owns_na_class_and_context;
na_bool_t finalize_flag;
cached_op_id_t op_id_cache;
ABT_mutex op_id_cache_mtx;
mona_request_t req_cache;
ABT_mutex req_cache_mtx;
} mona_instance;
typedef struct mona_request {
ABT_eventual eventual;
mona_instance_t mona;
na_addr_t* source_addr;
na_tag_t* tag;
na_size_t* size;
mona_request_t next; // for the request cache
} mona_request;
#define MONA_REQUEST_INITIALIZER { ABT_EVENTUAL_NULL, NULL, NULL, NULL, NULL, NULL }
#include "mona-types.h"
#include <string.h>
// ------------------------------------------------------------------------------------
// Mona progress loop logic
// ------------------------------------------------------------------------------------
static void mona_progress_loop(void* uarg) {
mona_instance_t mona = (mona_instance_t)uarg;
......@@ -50,7 +21,7 @@ static void mona_progress_loop(void* uarg) {
do {
trigger_ret = NA_Trigger(mona->na_context, 0, 1, NULL, &actual_count);
} while ((trigger_ret == NA_SUCCESS) && actual_count && !mona->finalize_flag);
ABT_pool_get_size(mona->progress_pool, &size);
if(size)
ABT_thread_yield();
......@@ -64,6 +35,10 @@ static void mona_progress_loop(void* uarg) {
}
}
// ------------------------------------------------------------------------------------
// Mona initialization logic
// ------------------------------------------------------------------------------------
mona_instance_t mona_init(
const char *info_string,
na_bool_t listen,
......@@ -176,10 +151,13 @@ mona_instance_t mona_init_na_pool(
mona->progress_thread = ABT_THREAD_NULL;
mona->op_id_cache_mtx = ABT_MUTEX_NULL;
mona->req_cache_mtx = ABT_MUTEX_NULL;
mona->msg_cache_mtx = ABT_MUTEX_NULL;
ret = ABT_mutex_create(&(mona->op_id_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
ret = ABT_mutex_create(&(mona->req_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
ret = ABT_mutex_create(&(mona->msg_cache_mtx));
if(ret != ABT_SUCCESS) goto error;
mona->op_id_cache = (cached_op_id_t)calloc(1, sizeof(*(mona->op_id_cache)));
mona->op_id_cache->op_id = NA_Op_create(na_class);
......@@ -220,25 +198,15 @@ na_return_t mona_finalize(mona_instance_t mona)
if(mona->owns_progress_pool)
ABT_pool_free(&(mona->progress_pool));
cached_op_id_t cached_op = mona->op_id_cache;
mona->op_id_cache = NULL;
while(cached_op) {
cached_op_id_t tmp = cached_op->next;
NA_Op_destroy(mona->na_class, cached_op->op_id);
free(cached_op);
cached_op = tmp;
}
clear_op_id_cache(mona);
ABT_mutex_free(&(mona->op_id_cache_mtx));
mona_request_t cached_req = mona->req_cache;
mona->req_cache = NULL;
while(cached_req) {
mona_request_t tmp = cached_req->next;
free(cached_req);
cached_req = tmp;
}
clear_req_cache(mona);
ABT_mutex_free(&(mona->req_cache_mtx));
clear_msg_cache(mona);
ABT_mutex_free(&(mona->msg_cache_mtx));
if(mona->owns_na_class_and_context) {
NA_Context_destroy(
mona->na_class,
......@@ -250,6 +218,10 @@ na_return_t mona_finalize(mona_instance_t mona)
return NA_SUCCESS;
}
// ------------------------------------------------------------------------------------
// Mona info access logic
// ------------------------------------------------------------------------------------
const char* mona_get_class_name(mona_instance_t mona)
{
return NA_Get_class_name(mona->na_class);
......@@ -265,6 +237,10 @@ na_bool_t mona_is_listening(mona_instance_t mona)
return NA_Is_listening(mona->na_class);
}
// ------------------------------------------------------------------------------------
// Mona addresses logic
// ------------------------------------------------------------------------------------
na_return_t mona_addr_lookup(
mona_instance_t mona,
const char *name,
......@@ -351,6 +327,10 @@ na_return_t mona_addr_deserialize(
return NA_Addr_deserialize(mona->na_class, addr, buf, buf_size);
}
// ------------------------------------------------------------------------------------
// Mona message information logic
// ------------------------------------------------------------------------------------
na_size_t mona_msg_get_max_unexpected_size(
mona_instance_t mona)
{
......@@ -380,6 +360,10 @@ na_tag_t mona_msg_get_max_tag(mona_instance_t mona)
return NA_Msg_get_max_tag(mona->na_class);
}
// ------------------------------------------------------------------------------------
// Mona operation logic
// ------------------------------------------------------------------------------------
na_op_id_t mona_op_create(mona_instance_t mona)
{
return NA_Op_create(mona->na_class);
......@@ -392,55 +376,9 @@ na_return_t mona_op_destroy(
return NA_Op_destroy(mona->na_class, op_id);
}
static cached_op_id_t get_op_id_from_cache(mona_instance_t mona)
{
cached_op_id_t id;
ABT_mutex_lock(mona->op_id_cache_mtx);
if(mona->op_id_cache) {
id = mona->op_id_cache;
mona->op_id_cache = id->next;
id->next = NULL;
} else {
na_op_id_t op_id = NA_Op_create(mona->na_class);
id = (cached_op_id_t)calloc(1, sizeof(*id));
id->op_id = op_id;
}
ABT_mutex_unlock(mona->op_id_cache_mtx);
return id;
}
static void return_op_id_to_cache(mona_instance_t mona, cached_op_id_t id)
{
ABT_mutex_lock(mona->op_id_cache_mtx);
cached_op_id_t head = mona->op_id_cache;
id->next = head;
mona->op_id_cache = id;
ABT_mutex_unlock(mona->op_id_cache_mtx);
}
static void return_req_to_cache(mona_instance_t mona, mona_request_t req)
{
ABT_mutex_lock(mona->req_cache_mtx);
mona_request_t head = mona->req_cache;
req->next = head;
mona->req_cache = req;
ABT_mutex_unlock(mona->req_cache_mtx);
}
static mona_request_t get_req_from_cache(mona_instance_t mona)
{
mona_request_t req;
ABT_mutex_lock(mona->req_cache_mtx);
if(mona->req_cache) {
req = mona->req_cache;
mona->req_cache = req->next;
req->next = NULL;
} else {
req = (mona_request_t)calloc(1, sizeof(*req));
}
ABT_mutex_unlock(mona->req_cache_mtx);
return req;
}
// ------------------------------------------------------------------------------------
// Mona message buffer logic
// ------------------------------------------------------------------------------------
void* mona_msg_buf_alloc(
mona_instance_t mona,
......@@ -466,17 +404,9 @@ na_return_t mona_msg_init_unexpected(
return NA_Msg_init_unexpected(mona->na_class, buf, buf_size);
}
static na_return_t mona_wait_internal(mona_request_t req)
{
na_return_t* waited_na_ret = NULL;
na_return_t na_ret = NA_SUCCESS;
ABT_eventual_wait(req->eventual, (void**)&waited_na_ret);
na_ret = *waited_na_ret;
ABT_eventual_free(&(req->eventual));
return na_ret;
}
// ------------------------------------------------------------------------------------
// Mona request logic
// ------------------------------------------------------------------------------------
na_return_t mona_wait(mona_request_t req)
{
......@@ -513,6 +443,227 @@ static int mona_callback(const struct na_cb_info *info)
return NA_SUCCESS;
}
// ------------------------------------------------------------------------------------
// Mona high-level send/recv logic
// ------------------------------------------------------------------------------------
na_return_t mona_send(
mona_instance_t mona,
const void *buf,
na_size_t buf_size,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag)
{
na_return_t na_ret = NA_SUCCESS;
na_mem_handle_t mem_handle = NA_MEM_HANDLE_NULL;
na_size_t msg_size = mona_msg_get_unexpected_header_size(mona) + 1 + buf_size;
cached_msg_t msg = get_msg_from_cache(mona);
if(msg_size <= mona_msg_get_max_unexpected_size(mona)) {
na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
if(na_ret != NA_SUCCESS) goto finish;
char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
*p = HL_MSG_SMALL;
p += 1;
memcpy(p, buf, buf_size);
na_ret = mona_msg_send_unexpected(
mona, msg->buffer, msg_size,
msg->plugin_data, dest,
dest_id, tag);
} else {
// Expose user memory for RDMA
na_ret = mona_mem_handle_create(mona, (void*)buf, buf_size, NA_MEM_READ_ONLY, &mem_handle);
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_mem_register(mona, mem_handle);
if(na_ret != NA_SUCCESS) goto finish;
na_size_t mem_handle_size = mona_mem_handle_get_serialize_size(mona, mem_handle);
// Initialize message to send
msg_size = mona_msg_get_unexpected_header_size(mona) // NA header
+ 1 // type of message (HL_MSG_*)
+ sizeof(na_size_t) // size of the serialize handle
+ sizeof(na_size_t) // size of the data
+ mem_handle_size;
na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
goto finish;
}
// Fill in the message
char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
*p = HL_MSG_LARGE;
p += 1;
memcpy(p, &mem_handle_size, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
memcpy(p, &buf_size, sizeof(buf_size));
p += sizeof(buf_size);
na_ret = mona_mem_handle_serialize(mona, p, mem_handle_size, mem_handle);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
goto finish;
}
// Initialize ack message to receive
cached_msg_t ack_msg = get_msg_from_cache(mona);
na_size_t ack_msg_size = mona_msg_get_unexpected_header_size(mona) + 1;
mona_request_t ack_req = MONA_REQUEST_NULL;
cached_op_id_t ack_cache_id = get_op_id_from_cache(mona);
na_op_id_t ack_op_id = ack_cache_id->op_id;
// Issue non-blocking receive for ACK
na_ret = mona_msg_irecv_expected(mona, ack_msg->buffer, ack_msg_size,
ack_msg->plugin_data, dest, dest_id, tag, &ack_op_id, &ack_req);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
return_op_id_to_cache(mona, ack_cache_id);
goto finish;
}