Commit 382b04c4 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Cutting ACC and GACC messages.



In this patch, we define the size of streaming unit the same
as the SRBuf size (256 * 1024 bytes), and cut the ACC/GACC packet
according to this size. The streaming unit always contains
complete basic type data and does not contain partial basic
type data.

Note that we also increment the ref counter of the pointer
to the derived datatype since multiple streaming units within
one RMA operation will refer to it.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent d3cbeab3
......@@ -10,6 +10,9 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
/* define ACC stream size as the SRBuf size */
#define MPIDI_CH3U_Acc_stream_size MPIDI_CH3U_SRBuf_size
/* =========================================================== */
/* auxiliary functions */
/* =========================================================== */
......@@ -636,49 +639,102 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIDI_VC_t *vc = NULL;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_accum_t *accum_pkt = &rma_op->pkt.accum;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int i, j;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_extent, predefined_dtp_count;
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_ACC_OP);
accum_pkt->flags |= flags;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
MPID_Request *curr_req = NULL;
accum_pkt->flags |= flags;
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (curr_req != NULL) {
MPIU_Assert(rma_op->reqs_size == 0 && rma_op->reqs == NULL);
rma_op->reqs_size = 1;
rma_op->reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs[0] = curr_req;
win_ptr->active_req_cnt++;
}
goto fn_exit;
}
/* Get total length of origin data */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
total_len = origin_dtp_size * rma_op->origin_count;
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
predefined_dtp_size = origin_dtp_size;
predefined_dtp_count = rma_op->origin_count;
MPID_Datatype_get_extent_macro(rma_op->origin_datatype, predefined_dtp_extent);
}
else {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp_ptr);
MPIU_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->eltype != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp_ptr->eltype, predefined_dtp_size);
predefined_dtp_count = total_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp_ptr->eltype, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
/* Calculate number of predefined elements in each stream unit, and
* total number of stream units. */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
rest_len = total_len;
for (j = 0; j < stream_unit_count; j++) {
MPIDI_msg_sz_t stream_offset, stream_size;
MPI_Aint origin_type_size;
MPID_Request *curr_req = NULL;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
accum_pkt->flags |= flags;
stream_offset = 0;
stream_size = origin_type_size * rma_op->origin_count;
stream_offset = j * stream_elem_count * predefined_dtp_size;
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
if (curr_req != NULL) {
rma_op->reqs_size = 1;
if (curr_req != NULL) {
if (rma_op->reqs_size == 0) {
MPIU_Assert(rma_op->reqs == NULL);
rma_op->reqs_size = stream_unit_count;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
}
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
}
rma_op->reqs[j] = curr_req;
win_ptr->active_req_cnt++;
}
} /* end of for loop */
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_ACC_OP);
......@@ -704,102 +760,176 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIDI_VC_t *vc = NULL;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &rma_op->pkt.get_accum;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int i, j;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_GET_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_GET_ACC_OP);
rma_op->reqs_size = 1;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
get_accum_pkt->flags |= flags;
MPIU_Object_set_ref(resp_req, 2);
rma_op->reqs_size = 1;
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = MPI_WIN_NULL;
resp_req->dev.source_win_handle = win_ptr->handle;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
if (!MPIR_DATATYPE_IS_PREDEFINED(resp_req->dev.datatype)) {
MPID_Datatype *result_dtp = NULL;
MPID_Datatype_get_ptr(resp_req->dev.datatype, result_dtp);
resp_req->dev.datatype_ptr = result_dtp;
/* this will cause the datatype to be freed when the
* request is freed. */
}
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
/* Note: Get_accumulate uses the same packet type as accumulate */
get_accum_pkt->request_handle = resp_req->handle;
MPIU_Object_set_ref(resp_req, 2);
get_accum_pkt->flags |= flags;
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = MPI_WIN_NULL;
resp_req->dev.source_win_handle = win_ptr->handle;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
/* Note: Get_accumulate uses the same packet type as accumulate */
get_accum_pkt->request_handle = resp_req->handle;
if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (curr_req != NULL) {
MPID_Request_release(curr_req);
curr_req = resp_req;
}
else {
curr_req = resp_req;
}
/* For error checking */
resp_req = NULL;
rma_op->reqs[0] = curr_req;
win_ptr->active_req_cnt++;
goto fn_exit;
}
/* Get total length of origin data */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
total_len = origin_dtp_size * rma_op->origin_count;
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
predefined_dtp_size = origin_dtp_size;
predefined_dtp_count = rma_op->origin_count;
MPID_Datatype_get_extent_macro(rma_op->origin_datatype, predefined_dtp_extent);
}
else {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp_ptr);
MPIU_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->eltype != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp_ptr->eltype, predefined_dtp_size);
predefined_dtp_count = total_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp_ptr->eltype, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
/* Calculate number of predefined elements in each stream unit, and
* total number of stream units. */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
rest_len = total_len;
rma_op->reqs_size = stream_unit_count;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
for (j = 0; j < stream_unit_count; j++) {
MPIDI_msg_sz_t stream_offset, stream_size;
MPI_Aint origin_type_size;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
get_accum_pkt->flags |= flags;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 2);
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = MPI_WIN_NULL;
resp_req->dev.source_win_handle = win_ptr->handle;
if (!MPIR_DATATYPE_IS_PREDEFINED(resp_req->dev.datatype)) {
MPID_Datatype *result_dtp = NULL;
MPID_Datatype_get_ptr(resp_req->dev.datatype, result_dtp);
resp_req->dev.datatype_ptr = result_dtp;
/* this will cause the datatype to be freed when the
* request is freed. */
}
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
/* Note: Get_accumulate uses the same packet type as accumulate */
get_accum_pkt->request_handle = resp_req->handle;
stream_offset = 0;
stream_size = origin_type_size * rma_op->origin_count;
stream_offset = j * stream_elem_count * predefined_dtp_size;
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* This operation can generate two requests; one for inbound and one for
* outbound data. */
if (curr_req != NULL) {
/* If we have both inbound and outbound requests (i.e. GACC
* operation), we need to ensure that the source buffer is
* available and that the response data has been received before
* informing the origin that this operation is complete. Because
* the update needs to be done atomically at the target, they will
* not send back data until it has been received. Therefore,
* completion of the response request implies that the send request
* has completed.
*
* Therefore: refs on the response request are set to two: one is
* held by the progress engine and the other by the RMA op
* completion code. Refs on the outbound request are set to one;
* it will be completed by the progress engine.
*/
MPID_Request_release(curr_req);
curr_req = resp_req;
}
else {
curr_req = resp_req;
}
/* This operation can generate two requests; one for inbound and one for
* outbound data. */
if (curr_req != NULL) {
/* If we have both inbound and outbound requests (i.e. GACC
* operation), we need to ensure that the source buffer is
* available and that the response data has been received before
* informing the origin that this operation is complete. Because
* the update needs to be done atomically at the target, they will
* not send back data until it has been received. Therefore,
* completion of the response request implies that the send request
* has completed.
*
* Therefore: refs on the response request are set to two: one is
* held by the progress engine and the other by the RMA op
* completion code. Refs on the outbound request are set to one;
* it will be completed by the progress engine.
*/
MPID_Request_release(curr_req);
curr_req = resp_req;
}
else {
curr_req = resp_req;
}
/* For error checking */
resp_req = NULL;
/* For error checking */
resp_req = NULL;
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
rma_op->reqs[j] = curr_req;
win_ptr->active_req_cnt++;
} /* end of for loop */
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_GET_ACC_OP);
......
......@@ -508,6 +508,10 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
size_t immed_len, len;
int use_immed_pkt = FALSE;
int is_origin_contig, is_target_contig;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
MPID_Datatype *origin_dtp = NULL, *target_dtp = NULL;
int i;
/* queue it up */
mpi_errno = MPIDI_CH3I_Win_get_op(win_ptr, &new_ptr);
......@@ -531,19 +535,47 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
/* if source or target datatypes are derived, increment their
* reference counts */
if (!MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
MPID_Datatype_get_ptr(origin_datatype, dtp);
MPID_Datatype_add_ref(dtp);
MPID_Datatype_get_ptr(origin_datatype, origin_dtp);
new_ptr->is_dt = 1;
}
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, dtp);
MPID_Datatype_add_ref(dtp);
MPID_Datatype_get_ptr(target_datatype, target_dtp);
new_ptr->is_dt = 1;
}
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, origin_count * origin_type_size, size_t);
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
predefined_dtp_size = origin_type_size;
predefined_dtp_count = origin_count;
MPID_Datatype_get_extent_macro(origin_datatype, predefined_dtp_extent);
}
else {
MPIU_Assert(origin_dtp->eltype != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp->eltype, predefined_dtp_size);
predefined_dtp_count = len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp->eltype, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 &&
predefined_dtp_extent > 0);
/* Calculate number of predefined elements in each stream unit, and
* total number of stream units. */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
for (i = 0; i < stream_unit_count; i++) {
if (origin_dtp != NULL) {
MPID_Datatype_add_ref(origin_dtp);
}
if (target_dtp != NULL) {
MPID_Datatype_add_ref(target_dtp);
}
}
MPID_Datatype_is_contig(origin_datatype, &is_origin_contig);
MPID_Datatype_is_contig(target_datatype, &is_target_contig);
......@@ -790,6 +822,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
size_t immed_len, orig_len;
int use_immed_pkt = FALSE;
int is_origin_contig, is_target_contig, is_result_contig;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
MPID_Datatype *origin_dtp = NULL, *target_dtp = NULL, *result_dtp = NULL;
int i;
/******************** Setting operation struct areas ***********************/
......@@ -809,24 +845,54 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
/* if source or target datatypes are derived, increment their
* reference counts */
if (!MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
MPID_Datatype_get_ptr(origin_datatype, dtp);
MPID_Datatype_add_ref(dtp);
MPID_Datatype_get_ptr(origin_datatype, origin_dtp);
new_ptr->is_dt = 1;
}
if (!MPIR_DATATYPE_IS_PREDEFINED(result_datatype)) {
MPID_Datatype_get_ptr(result_datatype, dtp);
MPID_Datatype_add_ref(dtp);
MPID_Datatype_get_ptr(result_datatype, target_dtp);
new_ptr->is_dt = 1;
}
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, dtp);
MPID_Datatype_add_ref(dtp);
MPID_Datatype_get_ptr(target_datatype, result_dtp);
new_ptr->is_dt = 1;
}
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(orig_len, origin_count * origin_type_size, size_t);
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
predefined_dtp_size = origin_type_size;
predefined_dtp_count = origin_count;
MPID_Datatype_get_extent_macro(origin_datatype, predefined_dtp_extent);
}
else {
MPIU_Assert(origin_dtp->eltype != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp->eltype, predefined_dtp_size);
predefined_dtp_count = orig_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp->eltype, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 &&
predefined_dtp_extent > 0);
/* Calculate number of predefined elements in each stream unit, and
* total number of stream units. */
stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
for (i = 0; i < stream_unit_count; i++) {
if (origin_dtp != NULL) {
MPID_Datatype_add_ref(origin_dtp);
}
if (target_dtp != NULL) {
MPID_Datatype_add_ref(target_dtp);
}
if (result_dtp != NULL) {
MPID_Datatype_add_ref(result_dtp);
}
}
MPID_Datatype_is_contig(origin_datatype, &is_origin_contig);
MPID_Datatype_is_contig(target_datatype, &is_target_contig);
MPID_Datatype_is_contig(result_datatype, &is_result_contig);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment