Commit cff51cc2 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added mona_send_nc and mona_recv_nc as well as their non-blocking equivalents

parent 38382153
......@@ -342,6 +342,72 @@ na_return_t mona_isend(
na_tag_t tag,
mona_request_t* req);
/**
* @see Same as mona_send but for non-contiguous data.
* A mon_send_nc can be matched by a mona_recv or a mona_recv_nc
* on the destination.
*/
na_return_t mona_send_nc(
mona_instance_t mona,
na_size_t count,
const void * const* buffers,
const na_size_t* buf_sizes,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag);
/**
* @see Same as mona_isend but for non-contiguous data.
* A mon_send_nc can be matched by a mona_recv or a mona_recv_nc
* on the destination.
*/
na_return_t mona_isend_nc(
mona_instance_t mona,
na_size_t count,
const void * const* buffers,
const na_size_t* buf_sizes,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
mona_request_t* req);
/**
* @brief Send data using a memory handle.
* This operation should be matched by a mona_recv_mem
* on the destination.
*
* @param mona [IN] Mona instance
* @param mem [IN] Memory handle to send
* @param size [IN] Size of the data to send
* @param offset [IN] Offset of the data in the memory handle
* @param dest [IN] Destination address
* @param dest_id [IN] Destination id
* @param tag [IN] Tag
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_send_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag);
/**
* @see Non-blocking version of mona_send_mem.
*/
na_return_t mona_isend_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
mona_request_t* req);
/**
* @brief High-level blocking recv function. This function will
* appropriatly use unexpected messages or combinations of unexpected,
......@@ -405,6 +471,78 @@ na_return_t mona_irecv(
na_tag_t* actual_tag,
mona_request_t* req);
/**
* @see Non-contiguous version of mona_recv.
* This function can match a mona_send or a mona_send_nc.
*/
na_return_t mona_recv_nc(
mona_instance_t mona,
na_size_t count,
void** buffers,
const na_size_t* buf_sizes,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag);
/**
* @see Non-blocking version of mona_recv_nc.
*/
na_return_t mona_irecv_nc(
mona_instance_t mona,
na_size_t count,
void** buffers,
const na_size_t* buf_sizes,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag,
mona_request_t* req);
/**
* @brief Receives data directly into a memory handle.
* This function should match a mona_send_mem.
*
* @param mona [IN] Mona instance
* @param mem [IN] Memory handle to receive data
* @param size [IN] Size of the data to receive
* @param offset [IN] Offset at which to place the data in the memory handle
* @param src [IN] Source address
* @param tag [IN] Tag
* @param actual_size [OUT] Actual size received
* @param actual_src [OUT] Actual source
* @param actual_tag [OUT] Actual tag
*
* @return NA_SUCCESS or corresponding NA error code
*/
na_return_t mona_recv_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag);
/**
* @see Non-blocking version of mona_recv_mem.
*/
na_return_t mona_irecv_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag,
mona_request_t* req);
/**
* Get the maximum size of messages supported by unexpected send/recv.
* Small message size.
......
......@@ -478,11 +478,91 @@ na_return_t mona_send(
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag)
{
return mona_send_nc(mona, 1, &buf, &buf_size, dest, dest_id, tag);
}
struct isend_args {
mona_instance_t mona;
const void* buf;
na_size_t buf_size;
na_addr_t dest;
na_uint8_t dest_id;
na_tag_t tag;
mona_request_t req;
};
static void isend_thread(void* x)
{
struct isend_args* args = (struct isend_args*)x;
na_return_t na_ret = mona_send(
args->mona,
args->buf,
args->buf_size,
args->dest,
args->dest_id,
args->tag);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_isend(
mona_instance_t mona,
const void *buf,
na_size_t buf_size,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
mona_request_t* req)
{
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct isend_args* args = (struct isend_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->buf_size = buf_size;
args->dest = dest;
args->dest_id = dest_id;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = eventual;
args->req = tmp_req;
ret = ABT_thread_create(mona->progress_pool, isend_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
na_return_t mona_send_nc(
mona_instance_t mona,
na_size_t count,
const void * const *buffers,
const na_size_t* buf_sizes,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag)
{
na_return_t na_ret = NA_SUCCESS;
na_mem_handle_t mem_handle = NA_MEM_HANDLE_NULL;
na_size_t msg_size = mona_msg_get_unexpected_header_size(mona) + 1 + buf_size;
na_size_t msg_size = mona_msg_get_unexpected_header_size(mona) + 1;
na_size_t data_size = 0;
cached_msg_t msg = get_msg_from_cache(mona);
unsigned i;
for(i = 0; i < count; i++) {
data_size += buf_sizes[i];
}
msg_size += data_size;
if(msg_size <= mona_msg_get_max_unexpected_size(mona)) {
......@@ -492,7 +572,11 @@ na_return_t mona_send(
char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
*p = HL_MSG_SMALL;
p += 1;
memcpy(p, buf, buf_size);
for(i = 0; i < count; i++) {
memcpy(p, buffers[i], buf_sizes[i]);
p += buf_sizes[i];
}
na_ret = mona_msg_send_unexpected(
mona, msg->buffer, msg_size,
......@@ -502,7 +586,16 @@ na_return_t mona_send(
} else {
// Expose user memory for RDMA
na_ret = mona_mem_handle_create(mona, (void*)buf, buf_size, NA_MEM_READ_ONLY, &mem_handle);
if(count == 1) {
na_ret = mona_mem_handle_create(mona, (void*)buffers[0], buf_sizes[0], NA_MEM_READ_ONLY, &mem_handle);
} else {
struct na_segment* segments = alloca(sizeof(*segments)*count);
for(i = 0; i < count; i++) {
segments[i].address = (na_ptr_t)buffers[i];
segments[i].size = buf_sizes[i];
}
na_ret = mona_mem_handle_create_segments(mona, segments, count, NA_MEM_READ_ONLY, &mem_handle);
}
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_mem_register(mona, mem_handle);
......@@ -529,8 +622,8 @@ na_return_t mona_send(
p += 1;
memcpy(p, &mem_handle_size, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
memcpy(p, &buf_size, sizeof(buf_size));
p += sizeof(buf_size);
memcpy(p, &data_size, sizeof(data_size));
p += sizeof(data_size);
na_ret = mona_mem_handle_serialize(mona, p, mem_handle_size, mem_handle);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
......@@ -577,23 +670,25 @@ finish:
return na_ret;
}
struct isend_args {
mona_instance_t mona;
const void* buf;
na_size_t buf_size;
na_addr_t dest;
na_uint8_t dest_id;
na_tag_t tag;
mona_request_t req;
struct isend_nc_args {
mona_instance_t mona;
na_size_t count;
const void* const* buffers;
const na_size_t* buf_sizes;
na_addr_t dest;
na_uint8_t dest_id;
na_tag_t tag;
mona_request_t req;
};
static void isend_thread(void* x)
static void isend_nc_thread(void* x)
{
struct isend_args* args = (struct isend_args*)x;
na_return_t na_ret = mona_send(
struct isend_nc_args* args = (struct isend_nc_args*)x;
na_return_t na_ret = mona_send_nc(
args->mona,
args->buf,
args->buf_size,
args->count,
args->buffers,
args->buf_sizes,
args->dest,
args->dest_id,
args->tag);
......@@ -601,10 +696,11 @@ static void isend_thread(void* x)
free(args);
}
na_return_t mona_isend(
na_return_t mona_isend_nc(
mona_instance_t mona,
const void *buf,
na_size_t buf_size,
na_size_t count,
const void * const* buffers,
const na_size_t* buf_sizes,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
......@@ -615,19 +711,20 @@ na_return_t mona_isend(
if(ret != 0)
return NA_NOMEM;
struct isend_args* args = (struct isend_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->buf_size = buf_size;
args->dest = dest;
args->dest_id = dest_id;
args->tag = tag;
struct isend_nc_args* args = (struct isend_nc_args*)malloc(sizeof(*args));
args->mona = mona;
args->count = count;
args->buffers = buffers;
args->buf_sizes = buf_sizes;
args->dest = dest;
args->dest_id = dest_id;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = eventual;
args->req = tmp_req;
ret = ABT_thread_create(mona->progress_pool, isend_thread, args, ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(mona->progress_pool, isend_nc_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
......@@ -772,6 +869,89 @@ na_return_t mona_recv(
na_addr_t* actual_src,
na_tag_t* actual_tag)
{
return mona_recv_nc(mona, 1, &buf, &size, src, tag, actual_size, actual_src, actual_tag);
}
struct irecv_args {
mona_instance_t mona;
void* buf;
na_size_t size;
na_addr_t src;
na_tag_t tag;
na_size_t* actual_size;
na_addr_t* actual_src;
na_tag_t* actual_tag;
mona_request_t req;
};
static void irecv_thread(void* x)
{
struct irecv_args* args = (struct irecv_args*)x;
na_return_t na_ret = mona_recv(
args->mona,
args->buf,
args->size,
args->src,
args->tag,
args->actual_size,
args->actual_src,
args->actual_tag);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_irecv(
mona_instance_t mona,
void* buf,
na_size_t size,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag,
mona_request_t* req)
{
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct irecv_args* args = (struct irecv_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->size = size;
args->src = src;
args->actual_size = actual_size;
args->actual_src = actual_src;
args->actual_tag = actual_tag;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
args->req = tmp_req;
tmp_req->eventual = eventual;
ret = ABT_thread_create(mona->progress_pool, irecv_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
na_return_t mona_recv_nc(
mona_instance_t mona,
na_size_t count,
void** buffers,
const na_size_t* buf_sizes,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag)
{
na_return_t na_ret = NA_SUCCESS;
na_mem_handle_t mem_handle = NA_MEM_HANDLE_NULL;
......@@ -781,6 +961,12 @@ na_return_t mona_recv(
na_size_t recv_size = 0;
na_addr_t recv_addr = NA_ADDR_NULL;
na_tag_t recv_tag = 0;
na_size_t max_data_size = 0;
unsigned i;
for(i = 0; i < count; i++){
max_data_size += buf_sizes[i];
}
// wait for a matching unexpected message to come around
msg = wait_for_matching_unexpected_message(mona, src, tag, &recv_size, &recv_addr, &recv_tag);
......@@ -795,9 +981,13 @@ na_return_t mona_recv(
p += 1;
recv_size -= header_size + 1;
recv_size = recv_size < size ? recv_size : size;
if(recv_size)
memcpy(buf, p, recv_size);
na_size_t remaining_size = recv_size;
for(i = 0; i < count && remaining_size != 0; i++) {
na_size_t s = remaining_size < buf_sizes[i] ? remaining_size : buf_sizes[i];
memcpy(buffers[i], p, s);
remaining_size -= s;
}
recv_size = recv_size < max_data_size ? recv_size : max_data_size;
} else if(*p == HL_MSG_LARGE) { // large message, using RDMA transfer
......@@ -812,8 +1002,17 @@ na_return_t mona_recv(
p += sizeof(data_size);
// expose user memory for RDMA
na_ret = mona_mem_handle_create(
mona, (void*)buf, size, NA_MEM_WRITE_ONLY, &mem_handle);
if(count == 1) {
na_ret = mona_mem_handle_create(
mona, (void*)buffers[0], buf_sizes[0], NA_MEM_WRITE_ONLY, &mem_handle);
} else {
struct na_segment* segments = alloca(sizeof(*segments)*count);
for(i = 0; i < count; i++) {
segments[i].address = (na_ptr_t)buffers[i];
segments[i].size = buf_sizes[i];
}
na_ret = mona_mem_handle_create_segments(mona, segments, count, NA_MEM_WRITE_ONLY, &mem_handle);
}
if(na_ret != NA_SUCCESS) goto finish;
na_ret = mona_mem_register(mona, mem_handle);
......@@ -826,11 +1025,12 @@ na_return_t mona_recv(
// Issue RDMA operation
// XXX how do we support a source id different from 0 ?
data_size = data_size < size ? data_size : size;
data_size = data_size < max_data_size ? data_size : max_data_size;
if(data_size) {
na_ret = mona_get(mona, mem_handle, 0, remote_handle, 0, data_size, recv_addr, 0);
if(na_ret != NA_SUCCESS) goto finish;
}
recv_size = data_size;
// Send ACK
na_size_t msg_size = header_size + 1;
......@@ -865,25 +1065,27 @@ finish:
return na_ret;
}
struct irecv_args {
mona_instance_t mona;
void* buf;
na_size_t size;
na_addr_t src;
na_tag_t tag;
na_size_t* actual_size;
na_addr_t* actual_src;
na_tag_t* actual_tag;
mona_request_t req;
struct irecv_nc_args {
mona_instance_t mona;
na_size_t count;
void** buffers;
const na_size_t* buf_sizes;
na_addr_t src;
na_tag_t tag;
na_size_t* actual_size;
na_addr_t* actual_src;
na_tag_t* actual_tag;
mona_request_t req;
};
static void irecv_thread(void* x)
static void irecv_nc_thread(void* x)
{
struct irecv_args* args = (struct irecv_args*)x;
na_return_t na_ret = mona_recv(
struct irecv_nc_args* args = (struct irecv_nc_args*)x;
na_return_t na_ret = mona_recv_nc(
args->mona,
args->buf,
args->size,
args->count,
args->buffers,
args->buf_sizes,
args->src,
args->tag,
args->actual_size,
......@@ -893,10 +1095,11 @@ static void irecv_thread(void* x)
free(args);
}
na_return_t mona_irecv(
na_return_t mona_irecv_nc(
mona_instance_t mona,
void* buf,
na_size_t size,
na_size_t count,
void** buffers,
const na_size_t* buf_sizes,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
......@@ -909,10 +1112,11 @@ na_return_t mona_irecv(
if(ret != 0)
return NA_NOMEM;
struct irecv_args* args = (struct irecv_args*)malloc(sizeof(*args));
struct irecv_nc_args* args = (struct irecv_nc_args*)malloc(sizeof(*args));
args->mona = mona;
args->buf = buf;
args->size = size;
args->count = count;
args->buffers = buffers;
args->buf_sizes = buf_sizes;
args->src = src;
args->actual_size = actual_size;
args->actual_src = actual_src;
......@@ -923,7 +1127,7 @@ na_return_t mona_irecv(
args->req = tmp_req;
tmp_req->eventual = eventual;
ret = ABT_thread_create(mona->progress_pool, irecv_thread, args, ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(mona->progress_pool, irecv_nc_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment