Commit 0d5146ba authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Modify do_accumulate_op to allow for packed basic type data as input.



Originally, do_accumulate_op() is used to perform the ACC
computation on target between data from origin side and
data on the target window. It requires that the target side
must first unpack the received origin data into the same data
layout as the target data before calling this function, which
may consume potentially large of memory.

This patch fixes do_accumulate_op() function in the following
aspects:

(1) It requires that the origin data passed to the function
must be "in a packed manner", which means it looks as if all
basic type elements in the origin data is placed one by one.
Note that the origin data is not necessarily contiguous, since
we may use non-contiguous basic type. If the basic type
is contiguous, then the origin data must be contiguous.

(2) It adds a new function argument, stream_offset, which
specifies a starting location in the target data. This allows
the origin data to work with partial of target data with stream
size.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent c9435750
......@@ -761,39 +761,54 @@ static inline int MPIDI_CH3I_RMA_Handle_flush_ack(MPID_Win * win_ptr, int target
#define FUNCNAME do_accumulate_op
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int do_accumulate_op(void *source_buf, void *target_buf,
int acc_count, MPI_Datatype acc_dtp, MPI_Op acc_op)
static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
void *target_buf, int target_count, MPI_Datatype target_dtp,
MPI_Aint stream_offset, MPI_Op acc_op)
{
int mpi_errno = MPI_SUCCESS;
MPI_User_function *uop;
MPI_User_function *uop = NULL;
MPI_Aint source_dtp_size, source_dtp_extent;
MPIDI_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);
MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP);
if (acc_op == MPI_REPLACE) {
/* simply copy the data */
mpi_errno = MPIR_Localcopy(source_buf, acc_count, acc_dtp, target_buf, acc_count, acc_dtp);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(source_dtp));
MPID_Datatype_get_size_macro(source_dtp, source_dtp_size);
MPID_Datatype_get_extent_macro(source_dtp, source_dtp_extent);
if (acc_op != MPI_REPLACE) {
if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) {
/* get the function by indexing into the op table */
uop = MPIR_OP_HDL_TO_FN(acc_op);
}
else {
/* --BEGIN ERROR HANDLING-- */
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
FCNAME, __LINE__, MPI_ERR_OP,
"**opnotpredefined", "**opnotpredefined %d", acc_op);
return mpi_errno;
/* --END ERROR HANDLING-- */
}
goto fn_exit;
}
if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) {
/* get the function by indexing into the op table */
uop = MPIR_OP_HDL_TO_FN(acc_op);
}
else {
/* --BEGIN ERROR HANDLING-- */
mpi_errno =
MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OP,
"**opnotpredefined", "**opnotpredefined %d", acc_op);
return mpi_errno;
/* --END ERROR HANDLING-- */
}
if (MPIR_DATATYPE_IS_PREDEFINED(acc_dtp)) {
(*uop) (source_buf, target_buf, &acc_count, &acc_dtp);
if (MPIR_DATATYPE_IS_PREDEFINED(target_dtp)) {
/* apply op if target dtp is predefined dtp */
MPIU_Assert(source_dtp == target_dtp);
MPI_Aint real_stream_offset = (stream_offset / source_dtp_size) * source_dtp_extent;
void *curr_target_buf = (void *) ((char *) target_buf + real_stream_offset);
if (acc_op == MPI_REPLACE) {
mpi_errno = MPIR_Localcopy(source_buf, source_count, source_dtp,
curr_target_buf, source_count, source_dtp);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
else {
(*uop) (source_buf, curr_target_buf, &source_count, &source_dtp);
}
}
else {
/* derived datatype */
......@@ -806,6 +821,7 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
MPID_Datatype *dtp;
MPI_Aint curr_len;
void *curr_loc;
int accumulated_count;
segp = MPID_Segment_alloc();
/* --BEGIN ERROR HANDLING-- */
......@@ -817,12 +833,12 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
return mpi_errno;
}
/* --END ERROR HANDLING-- */
MPID_Segment_init(NULL, acc_count, acc_dtp, segp, 0);
first = 0;
last = SEGMENT_IGNORE_LAST;
MPID_Segment_init(NULL, target_count, target_dtp, segp, 0);
first = stream_offset;
last = first + source_count * source_dtp_size;
MPID_Datatype_get_ptr(acc_dtp, dtp);
vec_len = dtp->max_contig_blocks * acc_count + 1;
MPID_Datatype_get_ptr(target_dtp, dtp);
vec_len = dtp->max_contig_blocks * target_count + 1;
/* +1 needed because Rob says so */
dloop_vec = (DLOOP_VECTOR *)
MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
......@@ -841,12 +857,14 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
type = dtp->eltype;
MPIU_Assert(type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(type, type_size);
MPID_Datatype_get_extent_macro(type, type_extent);
MPIU_Assert(type == source_dtp);
type_size = source_dtp_size;
type_extent = source_dtp_extent;
i = 0;
curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF;
curr_len = dloop_vec[0].DLOOP_VECTOR_LEN;
accumulated_count = 0;
while (i != vec_len) {
if (curr_len < type_size) {
MPIU_Assert(i != vec_len);
......@@ -856,8 +874,19 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
}
MPIU_Assign_trunc(count, curr_len / type_size, int);
(*uop) ((char *) source_buf + MPIU_PtrToAint(curr_loc),
(char *) target_buf + MPIU_PtrToAint(curr_loc), &count, &type);
if (acc_op == MPI_REPLACE) {
mpi_errno = MPIR_Localcopy((char *) source_buf + type_extent * accumulated_count,
count, type,
(char *) target_buf + MPIU_PtrToAint(curr_loc),
count, type);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
else {
(*uop) ((char *) source_buf + type_extent * accumulated_count,
(char *) target_buf + MPIU_PtrToAint(curr_loc), &count, &type);
}
if (curr_len % type_size == 0) {
i++;
......@@ -870,6 +899,8 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
curr_loc = (void *) ((char *) curr_loc + type_extent * count);
curr_len -= type_size * count;
}
accumulated_count += count;
}
MPID_Segment_free(segp);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment