Commit 2002d1b9 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added implementations of mona_recv_mem and mona_send_mem and corresponding non-blocking functions

parent cff51cc2
......@@ -601,71 +601,14 @@ na_return_t mona_send_nc(
na_ret = mona_mem_register(mona, mem_handle);
if(na_ret != NA_SUCCESS) goto finish;
na_size_t mem_handle_size = mona_mem_handle_get_serialize_size(mona, mem_handle);
// Initialize message to send
msg_size = mona_msg_get_unexpected_header_size(mona) // NA header
+ 1 // type of message (HL_MSG_*)
+ sizeof(na_size_t) // size of the serialize handle
+ sizeof(na_size_t) // size of the data
+ mem_handle_size;
na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
goto finish;
}
// Fill in the message
char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
*p = HL_MSG_LARGE;
p += 1;
memcpy(p, &mem_handle_size, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
memcpy(p, &data_size, sizeof(data_size));
p += sizeof(data_size);
na_ret = mona_mem_handle_serialize(mona, p, mem_handle_size, mem_handle);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
goto finish;
}
// Initialize ack message to receive
cached_msg_t ack_msg = get_msg_from_cache(mona);
na_size_t ack_msg_size = mona_msg_get_unexpected_header_size(mona) + 1;
mona_request_t ack_req = MONA_REQUEST_NULL;
cached_op_id_t ack_cache_id = get_op_id_from_cache(mona);
na_op_id_t ack_op_id = ack_cache_id->op_id;
// Issue non-blocking receive for ACK
na_ret = mona_msg_irecv_expected(mona, ack_msg->buffer, ack_msg_size,
ack_msg->plugin_data, dest, dest_id, tag, &ack_op_id, &ack_req);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
return_op_id_to_cache(mona, ack_cache_id);
goto finish;
}
// Issue send of message with mem handle
na_ret = mona_msg_send_unexpected(
mona, msg->buffer, msg_size,
msg->plugin_data, dest,
dest_id, tag);
if(na_ret != NA_SUCCESS) {
mona_mem_deregister(mona, mem_handle);
mona_cancel(mona, ack_op_id);
return_op_id_to_cache(mona, ack_cache_id);
goto finish;
}
// Wait for acknowledgement
na_ret = mona_wait(ack_req);
return_op_id_to_cache(mona, ack_cache_id);
na_ret = mona_send_mem(mona, mem_handle, data_size, 0, dest, dest_id, tag);
mona_mem_deregister(mona, mem_handle);
}
finish:
if(mem_handle != NA_MEM_HANDLE_NULL)
if(mem_handle != NA_MEM_HANDLE_NULL) {
mona_mem_handle_free(mona, mem_handle);
}
return_msg_to_cache(mona, msg);
return na_ret;
}
......@@ -735,6 +678,151 @@ na_return_t mona_isend_nc(
return NA_SUCCESS;
}
na_return_t mona_send_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag)
{
na_return_t na_ret = NA_SUCCESS;
na_size_t msg_size = 0;
cached_msg_t msg = get_msg_from_cache(mona);
na_size_t mem_handle_size = mona_mem_handle_get_serialize_size(mona, mem);
// Initialize message to send
msg_size = mona_msg_get_unexpected_header_size(mona) // NA header
+ 1 // type of message (HL_MSG_*)
+ sizeof(na_size_t) // size of the serialized handle
+ sizeof(na_size_t) // size of the data
+ sizeof(na_size_t) // offset in handle
+ mem_handle_size;
na_ret = mona_msg_init_unexpected(mona, msg->buffer, msg_size);
if(na_ret != NA_SUCCESS)
goto finish;
// Fill in the message
char* p = msg->buffer + mona_msg_get_unexpected_header_size(mona);
*p = HL_MSG_LARGE;
p += 1;
// size of serialized handle
memcpy(p, &mem_handle_size, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
// size of the data
memcpy(p, &size, sizeof(size));
p += sizeof(size);
// offset in the handle
memcpy(p, &offset, sizeof(offset));
p += sizeof(offset);
// serialized handle
na_ret = mona_mem_handle_serialize(mona, p, mem_handle_size, mem);
if(na_ret != NA_SUCCESS)
goto finish;
// Initialize ack message to receive
cached_msg_t ack_msg = get_msg_from_cache(mona);
na_size_t ack_msg_size = mona_msg_get_unexpected_header_size(mona) + 1;
mona_request_t ack_req = MONA_REQUEST_NULL;
cached_op_id_t ack_cache_id = get_op_id_from_cache(mona);
na_op_id_t ack_op_id = ack_cache_id->op_id;
// Issue non-blocking receive for ACK
na_ret = mona_msg_irecv_expected(mona, ack_msg->buffer, ack_msg_size,
ack_msg->plugin_data, dest, dest_id, tag, &ack_op_id, &ack_req);
if(na_ret != NA_SUCCESS) {
return_op_id_to_cache(mona, ack_cache_id);
goto finish;
}
// Issue send of message with mem handle
na_ret = mona_msg_send_unexpected(
mona, msg->buffer, msg_size,
msg->plugin_data, dest,
dest_id, tag);
if(na_ret != NA_SUCCESS) {
mona_cancel(mona, ack_op_id);
return_op_id_to_cache(mona, ack_cache_id);
goto finish;
}
// Wait for acknowledgement
na_ret = mona_wait(ack_req);
return_op_id_to_cache(mona, ack_cache_id);
finish:
return_msg_to_cache(mona, msg);
return na_ret;
}
struct isend_mem_args {
mona_instance_t mona;
na_mem_handle_t mem;
na_size_t size;
na_size_t offset;
na_addr_t dest;
na_uint8_t dest_id;
na_tag_t tag;
mona_request_t req;
};
static void isend_mem_thread(void* x)
{
struct isend_mem_args* args = (struct isend_mem_args*)x;
na_return_t na_ret = mona_send_mem(
args->mona,
args->mem,
args->size,
args->offset,
args->dest,
args->dest_id,
args->tag);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_isend_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t dest,
na_uint8_t dest_id,
na_tag_t tag,
mona_request_t* req)
{
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct isend_mem_args* args = (struct isend_mem_args*)malloc(sizeof(*args));
args->mona = mona;
args->mem = mem;
args->size = size;
args->offset = offset;
args->dest = dest;
args->dest_id = dest_id;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
tmp_req->eventual = eventual;
args->req = tmp_req;
ret = ABT_thread_create(mona->progress_pool, isend_mem_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
static cached_msg_t wait_for_matching_unexpected_message(
mona_instance_t mona,
na_addr_t src,
......@@ -994,12 +1082,16 @@ na_return_t mona_recv_nc(
p += 1;
na_size_t mem_handle_size;
na_size_t data_size;
na_size_t remote_offset;
// read the size of the serialize mem handle
memcpy(&mem_handle_size, p, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
// read the size of the data associated with the mem handle
memcpy(&data_size, p, sizeof(data_size));
p += sizeof(data_size);
// read the offset
memcpy(&remote_offset, p, sizeof(remote_offset));
p += sizeof(remote_offset);
// expose user memory for RDMA
if(count == 1) {
......@@ -1027,7 +1119,7 @@ na_return_t mona_recv_nc(
// XXX how do we support a source id different from 0 ?
data_size = data_size < max_data_size ? data_size : max_data_size;
if(data_size) {
na_ret = mona_get(mona, mem_handle, 0, remote_handle, 0, data_size, recv_addr, 0);
na_ret = mona_get(mona, mem_handle, 0, remote_handle, remote_offset, data_size, recv_addr, 0);
if(na_ret != NA_SUCCESS) goto finish;
}
recv_size = data_size;
......@@ -1138,6 +1230,164 @@ na_return_t mona_irecv_nc(
return NA_SUCCESS;
}
na_return_t mona_recv_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag)
{
na_return_t na_ret = NA_SUCCESS;
na_mem_handle_t remote_handle = NA_MEM_HANDLE_NULL;
na_size_t header_size = mona_msg_get_unexpected_header_size(mona);
cached_msg_t msg = NULL;
na_size_t recv_size = 0;
na_addr_t recv_addr = NA_ADDR_NULL;
na_tag_t recv_tag = 0;
// wait for a matching unexpected message to come around
msg = wait_for_matching_unexpected_message(mona, src, tag, &recv_size, &recv_addr, &recv_tag);
if(!msg) return NA_PROTOCOL_ERROR;
// At this point, we know msg is the message we are looking for
// and the attributes are recv_size, recv_tag, and recv_addr
char* p = msg->buffer + header_size + 1;
na_size_t mem_handle_size;
na_size_t remote_data_size;
na_size_t remote_offset;
// read the size of the serialize mem handle
memcpy(&mem_handle_size, p, sizeof(mem_handle_size));
p += sizeof(mem_handle_size);
// read the size of the data associated with the mem handle
memcpy(&remote_data_size, p, sizeof(remote_data_size));
p += sizeof(remote_data_size);
// read the remote offset
memcpy(&remote_offset, p, sizeof(remote_offset));
p += sizeof(remote_offset);
// Deserialize remote memory handle
na_ret = mona_mem_handle_deserialize(
mona, &remote_handle, p, mem_handle_size);
if(na_ret != NA_SUCCESS) goto finish;
// Issue RDMA operation
// XXX how do we support a source id different from 0 ?
recv_size = remote_data_size < size ? remote_data_size : size;
if(recv_size) {
na_ret = mona_get(mona, mem, 0, remote_handle, 0, recv_size, recv_addr, 0);
if(na_ret != NA_SUCCESS) goto finish;
}
// Send ACK
na_size_t msg_size = header_size + 1;
msg->buffer[msg_size-1] = 0;
na_ret = mona_msg_init_expected(mona, msg->buffer, msg_size);
if(na_ret != NA_SUCCESS) goto finish;
// XXX how do we support a source id different from 0 ?
na_ret = mona_msg_send_expected(mona, msg->buffer, msg_size,
msg->plugin_data, recv_addr, 0, recv_tag);
if(na_ret != NA_SUCCESS) goto finish;
if(actual_size)
*actual_size = recv_size;
if(actual_tag)
*actual_tag = recv_tag;
if(actual_src)
*actual_src = recv_addr;
else
mona_addr_free(mona, recv_addr);
recv_addr = NA_ADDR_NULL;
finish:
if(recv_addr != NA_ADDR_NULL)
mona_addr_free(mona, recv_addr);
if(remote_handle != NA_MEM_HANDLE_NULL)
mona_mem_handle_free(mona, remote_handle);
return_msg_to_cache(mona, msg);
return na_ret;
}
struct irecv_mem_args {
mona_instance_t mona;
na_mem_handle_t mem;
na_size_t size;
na_size_t offset;
na_addr_t src;
na_tag_t tag;
na_size_t* actual_size;
na_addr_t* actual_src;
na_tag_t* actual_tag;
mona_request_t req;
};
static void irecv_mem_thread(void* x)
{
struct irecv_mem_args* args = (struct irecv_mem_args*)x;
na_return_t na_ret = mona_recv_mem(
args->mona,
args->mem,
args->size,
args->offset,
args->src,
args->tag,
args->actual_size,
args->actual_src,
args->actual_tag);
ABT_eventual_set(args->req->eventual, &na_ret, sizeof(na_ret));
free(args);
}
na_return_t mona_irecv_mem(
mona_instance_t mona,
na_mem_handle_t mem,
na_size_t size,
na_size_t offset,
na_addr_t src,
na_tag_t tag,
na_size_t* actual_size,
na_addr_t* actual_src,
na_tag_t* actual_tag,
mona_request_t* req)
{
ABT_eventual eventual;
int ret = ABT_eventual_create(sizeof(na_return_t), &eventual);
if(ret != 0)
return NA_NOMEM;
struct irecv_mem_args* args = (struct irecv_mem_args*)malloc(sizeof(*args));
args->mona = mona;
args->mem = mem;
args->size = size;
args->offset = offset;
args->src = src;
args->actual_size = actual_size;
args->actual_src = actual_src;
args->actual_tag = actual_tag;
args->tag = tag;
mona_request_t tmp_req = get_req_from_cache(mona);
args->req = tmp_req;
tmp_req->eventual = eventual;
ret = ABT_thread_create(mona->progress_pool, irecv_mem_thread, args, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS) {
return_req_to_cache(mona, tmp_req);
return NA_NOMEM;
} else {
*req = tmp_req;
ABT_thread_yield();
}
return NA_SUCCESS;
}
// ------------------------------------------------------------------------------------
// Mona low-level unexpected send/recv logic
// ------------------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment